1#!/usr/bin/env python 2 3from tests.unit import unittest 4from tests.unit import AWSMockServiceTestCase 5 6from boto.ec2.connection import EC2Connection 7from boto.ec2.ec2object import TaggedEC2Object 8 9 10CREATE_TAGS_RESPONSE = br"""<?xml version="1.0" encoding="UTF-8"?> 11<CreateTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/"> 12 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId> 13 <return>true</return> 14</CreateTagsResponse> 15""" 16 17 18DELETE_TAGS_RESPONSE = br"""<?xml version="1.0" encoding="UTF-8"?> 19<DeleteTagsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/"> 20 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId> 21 <return>true</return> 22</DeleteTagsResponse> 23""" 24 25 26class TestAddTags(AWSMockServiceTestCase): 27 connection_class = EC2Connection 28 29 def default_body(self): 30 return CREATE_TAGS_RESPONSE 31 32 def test_add_tag(self): 33 self.set_http_response(status_code=200) 34 taggedEC2Object = TaggedEC2Object(self.service_connection) 35 taggedEC2Object.id = "i-abcd1234" 36 taggedEC2Object.tags["already_present_key"] = "already_present_value" 37 38 taggedEC2Object.add_tag("new_key", "new_value") 39 40 self.assert_request_parameters({ 41 'ResourceId.1': 'i-abcd1234', 42 'Action': 'CreateTags', 43 'Tag.1.Key': 'new_key', 44 'Tag.1.Value': 'new_value'}, 45 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 46 'SignatureVersion', 'Timestamp', 47 'Version']) 48 49 self.assertEqual(taggedEC2Object.tags, { 50 "already_present_key": "already_present_value", 51 "new_key": "new_value"}) 52 53 def test_add_tags(self): 54 self.set_http_response(status_code=200) 55 taggedEC2Object = TaggedEC2Object(self.service_connection) 56 taggedEC2Object.id = "i-abcd1234" 57 taggedEC2Object.tags["already_present_key"] = "already_present_value" 58 59 taggedEC2Object.add_tags({"key1": "value1", "key2": "value2"}) 60 61 self.assert_request_parameters({ 62 'ResourceId.1': 'i-abcd1234', 63 'Action': 'CreateTags', 64 'Tag.1.Key': 'key1', 65 'Tag.1.Value': 'value1', 66 'Tag.2.Key': 'key2', 67 'Tag.2.Value': 'value2'}, 68 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 69 'SignatureVersion', 'Timestamp', 70 'Version']) 71 72 self.assertEqual(taggedEC2Object.tags, { 73 "already_present_key": "already_present_value", 74 "key1": "value1", 75 "key2": "value2"}) 76 77 78class TestRemoveTags(AWSMockServiceTestCase): 79 connection_class = EC2Connection 80 81 def default_body(self): 82 return DELETE_TAGS_RESPONSE 83 84 def test_remove_tag(self): 85 self.set_http_response(status_code=200) 86 taggedEC2Object = TaggedEC2Object(self.service_connection) 87 taggedEC2Object.id = "i-abcd1234" 88 taggedEC2Object.tags["key1"] = "value1" 89 taggedEC2Object.tags["key2"] = "value2" 90 91 taggedEC2Object.remove_tag("key1", "value1") 92 93 self.assert_request_parameters({ 94 'ResourceId.1': 'i-abcd1234', 95 'Action': 'DeleteTags', 96 'Tag.1.Key': 'key1', 97 'Tag.1.Value': 'value1'}, 98 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 99 'SignatureVersion', 'Timestamp', 100 'Version']) 101 102 self.assertEqual(taggedEC2Object.tags, {"key2": "value2"}) 103 104 def test_remove_tag_no_value(self): 105 self.set_http_response(status_code=200) 106 taggedEC2Object = TaggedEC2Object(self.service_connection) 107 taggedEC2Object.id = "i-abcd1234" 108 taggedEC2Object.tags["key1"] = "value1" 109 taggedEC2Object.tags["key2"] = "value2" 110 111 taggedEC2Object.remove_tag("key1") 112 113 self.assert_request_parameters({ 114 'ResourceId.1': 'i-abcd1234', 115 'Action': 'DeleteTags', 116 'Tag.1.Key': 'key1'}, 117 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 118 'SignatureVersion', 'Timestamp', 119 'Version']) 120 121 self.assertEqual(taggedEC2Object.tags, {"key2": "value2"}) 122 123 def test_remove_tag_empty_value(self): 124 self.set_http_response(status_code=200) 125 taggedEC2Object = TaggedEC2Object(self.service_connection) 126 taggedEC2Object.id = "i-abcd1234" 127 taggedEC2Object.tags["key1"] = "value1" 128 taggedEC2Object.tags["key2"] = "value2" 129 130 taggedEC2Object.remove_tag("key1", "") 131 132 self.assert_request_parameters({ 133 'ResourceId.1': 'i-abcd1234', 134 'Action': 'DeleteTags', 135 'Tag.1.Key': 'key1', 136 'Tag.1.Value': ''}, 137 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 138 'SignatureVersion', 'Timestamp', 139 'Version']) 140 141 self.assertEqual(taggedEC2Object.tags, 142 {"key1": "value1", "key2": "value2"}) 143 144 def test_remove_tags(self): 145 self.set_http_response(status_code=200) 146 taggedEC2Object = TaggedEC2Object(self.service_connection) 147 taggedEC2Object.id = "i-abcd1234" 148 taggedEC2Object.tags["key1"] = "value1" 149 taggedEC2Object.tags["key2"] = "value2" 150 151 taggedEC2Object.remove_tags({"key1": "value1", "key2": "value2"}) 152 153 self.assert_request_parameters({ 154 'ResourceId.1': 'i-abcd1234', 155 'Action': 'DeleteTags', 156 'Tag.1.Key': 'key1', 157 'Tag.1.Value': 'value1', 158 'Tag.2.Key': 'key2', 159 'Tag.2.Value': 'value2'}, 160 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 161 'SignatureVersion', 'Timestamp', 162 'Version']) 163 164 self.assertEqual(taggedEC2Object.tags, {}) 165 166 def test_remove_tags_wrong_values(self): 167 self.set_http_response(status_code=200) 168 taggedEC2Object = TaggedEC2Object(self.service_connection) 169 taggedEC2Object.id = "i-abcd1234" 170 taggedEC2Object.tags["key1"] = "value1" 171 taggedEC2Object.tags["key2"] = "value2" 172 173 taggedEC2Object.remove_tags({"key1": "value1", "key2": "value3"}) 174 175 self.assert_request_parameters({ 176 'ResourceId.1': 'i-abcd1234', 177 'Action': 'DeleteTags', 178 'Tag.1.Key': 'key1', 179 'Tag.1.Value': 'value1', 180 'Tag.2.Key': 'key2', 181 'Tag.2.Value': 'value3'}, 182 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 183 'SignatureVersion', 'Timestamp', 184 'Version']) 185 186 self.assertEqual(taggedEC2Object.tags, {"key2": "value2"}) 187 188 def test_remove_tags_none_values(self): 189 self.set_http_response(status_code=200) 190 taggedEC2Object = TaggedEC2Object(self.service_connection) 191 taggedEC2Object.id = "i-abcd1234" 192 taggedEC2Object.tags["key1"] = "value1" 193 taggedEC2Object.tags["key2"] = "value2" 194 195 taggedEC2Object.remove_tags({"key1": "value1", "key2": None}) 196 197 self.assert_request_parameters({ 198 'ResourceId.1': 'i-abcd1234', 199 'Action': 'DeleteTags', 200 'Tag.1.Key': 'key1', 201 'Tag.1.Value': 'value1', 202 'Tag.2.Key': 'key2'}, 203 ignore_params_values=['AWSAccessKeyId', 'SignatureMethod', 204 'SignatureVersion', 'Timestamp', 205 'Version']) 206 207 self.assertEqual(taggedEC2Object.tags, {}) 208 209 210if __name__ == '__main__': 211 unittest.main() 212