1#!/usr/bin/env python 2# Copyright (c) 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved 3# 4# Permission is hereby granted, free of charge, to any person obtaining a 5# copy of this software and associated documentation files (the 6# "Software"), to deal in the Software without restriction, including 7# without limitation the rights to use, copy, modify, merge, publish, dis- 8# tribute, sublicense, and/or sell copies of the Software, and to permit 9# persons to whom the Software is furnished to do so, subject to the fol- 10# lowing conditions: 11# 12# The above copyright notice and this permission notice shall be included 13# in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21# IN THE SOFTWARE. 22# 23import json 24from tests.unit import unittest 25from tests.unit import AWSMockServiceTestCase 26from mock import Mock 27 28from boto.sns.connection import SNSConnection 29 30QUEUE_POLICY = { 31 u'Policy': 32 (u'{"Version":"2008-10-17","Id":"arn:aws:sqs:us-east-1:' 33 'idnum:testqueuepolicy/SQSDefaultPolicy","Statement":' 34 '[{"Sid":"sidnum","Effect":"Allow","Principal":{"AWS":"*"},' 35 '"Action":"SQS:GetQueueUrl","Resource":' 36 '"arn:aws:sqs:us-east-1:idnum:testqueuepolicy"}]}')} 37 38 39class TestSNSConnection(AWSMockServiceTestCase): 40 connection_class = SNSConnection 41 42 def setUp(self): 43 super(TestSNSConnection, self).setUp() 44 45 def default_body(self): 46 return b"{}" 47 48 def test_sqs_with_existing_policy(self): 49 self.set_http_response(status_code=200) 50 51 queue = Mock() 52 queue.get_attributes.return_value = QUEUE_POLICY 53 queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename' 54 55 self.service_connection.subscribe_sqs_queue('topic_arn', queue) 56 self.assert_request_parameters({ 57 'Action': 'Subscribe', 58 'ContentType': 'JSON', 59 'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename', 60 'Protocol': 'sqs', 61 'TopicArn': 'topic_arn', 62 'Version': '2010-03-31', 63 }, ignore_params_values=[]) 64 65 # Verify that the queue policy was properly updated. 66 actual_policy = json.loads(queue.set_attribute.call_args[0][1]) 67 self.assertEqual(actual_policy['Version'], '2008-10-17') 68 # A new statement should be appended to the end of the statement list. 69 self.assertEqual(len(actual_policy['Statement']), 2) 70 self.assertEqual(actual_policy['Statement'][1]['Action'], 71 'SQS:SendMessage') 72 73 def test_sqs_with_no_previous_policy(self): 74 self.set_http_response(status_code=200) 75 76 queue = Mock() 77 queue.get_attributes.return_value = {} 78 queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename' 79 80 self.service_connection.subscribe_sqs_queue('topic_arn', queue) 81 self.assert_request_parameters({ 82 'Action': 'Subscribe', 83 'ContentType': 'JSON', 84 'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename', 85 'Protocol': 'sqs', 86 'TopicArn': 'topic_arn', 87 'Version': '2010-03-31', 88 }, ignore_params_values=[]) 89 actual_policy = json.loads(queue.set_attribute.call_args[0][1]) 90 # Only a single statement should be part of the policy. 91 self.assertEqual(len(actual_policy['Statement']), 1) 92 93 def test_publish_with_positional_args(self): 94 self.set_http_response(status_code=200) 95 96 self.service_connection.publish('topic', 'message', 'subject') 97 self.assert_request_parameters({ 98 'Action': 'Publish', 99 'TopicArn': 'topic', 100 'Subject': 'subject', 101 'Message': 'message', 102 }, ignore_params_values=['Version', 'ContentType']) 103 104 def test_publish_with_kwargs(self): 105 self.set_http_response(status_code=200) 106 107 self.service_connection.publish(topic='topic', 108 message='message', 109 subject='subject') 110 self.assert_request_parameters({ 111 'Action': 'Publish', 112 'TopicArn': 'topic', 113 'Subject': 'subject', 114 'Message': 'message', 115 }, ignore_params_values=['Version', 'ContentType']) 116 117 def test_publish_with_target_arn(self): 118 self.set_http_response(status_code=200) 119 120 self.service_connection.publish(target_arn='target_arn', 121 message='message', 122 subject='subject') 123 self.assert_request_parameters({ 124 'Action': 'Publish', 125 'TargetArn': 'target_arn', 126 'Subject': 'subject', 127 'Message': 'message', 128 }, ignore_params_values=['Version', 'ContentType']) 129 130 def test_create_platform_application(self): 131 self.set_http_response(status_code=200) 132 133 self.service_connection.create_platform_application( 134 name='MyApp', 135 platform='APNS', 136 attributes={ 137 'PlatformPrincipal': 'a ssl certificate', 138 'PlatformCredential': 'a private key' 139 } 140 ) 141 self.assert_request_parameters({ 142 'Action': 'CreatePlatformApplication', 143 'Name': 'MyApp', 144 'Platform': 'APNS', 145 'Attributes.entry.1.key': 'PlatformCredential', 146 'Attributes.entry.1.value': 'a private key', 147 'Attributes.entry.2.key': 'PlatformPrincipal', 148 'Attributes.entry.2.value': 'a ssl certificate', 149 }, ignore_params_values=['Version', 'ContentType']) 150 151 def test_set_platform_application_attributes(self): 152 self.set_http_response(status_code=200) 153 154 self.service_connection.set_platform_application_attributes( 155 platform_application_arn='arn:myapp', 156 attributes={'PlatformPrincipal': 'a ssl certificate', 157 'PlatformCredential': 'a private key'}) 158 self.assert_request_parameters({ 159 'Action': 'SetPlatformApplicationAttributes', 160 'PlatformApplicationArn': 'arn:myapp', 161 'Attributes.entry.1.key': 'PlatformCredential', 162 'Attributes.entry.1.value': 'a private key', 163 'Attributes.entry.2.key': 'PlatformPrincipal', 164 'Attributes.entry.2.value': 'a ssl certificate', 165 }, ignore_params_values=['Version', 'ContentType']) 166 167 def test_create_platform_endpoint(self): 168 self.set_http_response(status_code=200) 169 170 self.service_connection.create_platform_endpoint( 171 platform_application_arn='arn:myapp', 172 token='abcde12345', 173 custom_user_data='john', 174 attributes={'Enabled': False}) 175 self.assert_request_parameters({ 176 'Action': 'CreatePlatformEndpoint', 177 'PlatformApplicationArn': 'arn:myapp', 178 'Token': 'abcde12345', 179 'CustomUserData': 'john', 180 'Attributes.entry.1.key': 'Enabled', 181 'Attributes.entry.1.value': False, 182 }, ignore_params_values=['Version', 'ContentType']) 183 184 def test_set_endpoint_attributes(self): 185 self.set_http_response(status_code=200) 186 187 self.service_connection.set_endpoint_attributes( 188 endpoint_arn='arn:myendpoint', 189 attributes={'CustomUserData': 'john', 190 'Enabled': False}) 191 self.assert_request_parameters({ 192 'Action': 'SetEndpointAttributes', 193 'EndpointArn': 'arn:myendpoint', 194 'Attributes.entry.1.key': 'CustomUserData', 195 'Attributes.entry.1.value': 'john', 196 'Attributes.entry.2.key': 'Enabled', 197 'Attributes.entry.2.value': False, 198 }, ignore_params_values=['Version', 'ContentType']) 199 200 def test_message_is_required(self): 201 self.set_http_response(status_code=200) 202 203 with self.assertRaises(TypeError): 204 self.service_connection.publish(topic='topic', subject='subject') 205 206 def test_publish_with_json(self): 207 self.set_http_response(status_code=200) 208 209 self.service_connection.publish( 210 message=json.dumps({ 211 'default': 'Ignored.', 212 'GCM': { 213 'data': 'goes here', 214 } 215 }), 216 message_structure='json', 217 subject='subject', 218 target_arn='target_arn' 219 ) 220 self.assert_request_parameters({ 221 'Action': 'Publish', 222 'TargetArn': 'target_arn', 223 'Subject': 'subject', 224 'MessageStructure': 'json', 225 }, ignore_params_values=['Version', 'ContentType', 'Message']) 226 self.assertDictEqual( 227 json.loads(self.actual_request.params["Message"]), 228 {"default": "Ignored.", "GCM": {"data": "goes here"}}) 229 230 def test_publish_with_utf8_message(self): 231 self.set_http_response(status_code=200) 232 subject = message = u'We \u2665 utf-8'.encode('utf-8') 233 self.service_connection.publish('topic', message, subject) 234 self.assert_request_parameters({ 235 'Action': 'Publish', 236 'TopicArn': 'topic', 237 'Subject': subject, 238 'Message': message, 239 }, ignore_params_values=['Version', 'ContentType']) 240 241 def test_publish_with_attributes(self): 242 self.set_http_response(status_code=200) 243 244 self.service_connection.publish( 245 message=json.dumps({ 246 'default': 'Ignored.', 247 'GCM': { 248 'data': 'goes here', 249 } 250 }, sort_keys=True), 251 message_structure='json', 252 subject='subject', 253 target_arn='target_arn', 254 message_attributes={ 255 'name1': { 256 'data_type': 'Number', 257 'string_value': '42' 258 }, 259 'name2': { 260 'data_type': 'String', 261 'string_value': 'Bob' 262 }, 263 }, 264 ) 265 self.assert_request_parameters({ 266 'Action': 'Publish', 267 'TargetArn': 'target_arn', 268 'Subject': 'subject', 269 'Message': '{"GCM": {"data": "goes here"}, "default": "Ignored."}', 270 'MessageStructure': 'json', 271 'MessageAttributes.entry.1.Name': 'name1', 272 'MessageAttributes.entry.1.Value.DataType': 'Number', 273 'MessageAttributes.entry.1.Value.StringValue': '42', 274 'MessageAttributes.entry.2.Name': 'name2', 275 'MessageAttributes.entry.2.Value.DataType': 'String', 276 'MessageAttributes.entry.2.Value.StringValue': 'Bob', 277 }, ignore_params_values=['Version', 'ContentType']) 278 279 280if __name__ == '__main__': 281 unittest.main() 282