1from tests.unit import unittest
2from tests.unit import AWSMockServiceTestCase
3
4from boto.vpc import VPCConnection, RouteTable
5
6
7class TestDescribeRouteTables(AWSMockServiceTestCase):
8
9    connection_class = VPCConnection
10
11    def default_body(self):
12        return b"""
13            <DescribeRouteTablesResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
14               <requestId>6f570b0b-9c18-4b07-bdec-73740dcf861a</requestId>
15               <routeTableSet>
16                  <item>
17                     <routeTableId>rtb-13ad487a</routeTableId>
18                     <vpcId>vpc-11ad4878</vpcId>
19                     <routeSet>
20                        <item>
21                           <destinationCidrBlock>10.0.0.0/22</destinationCidrBlock>
22                           <gatewayId>local</gatewayId>
23                           <state>active</state>
24                           <origin>CreateRouteTable</origin>
25                        </item>
26                     </routeSet>
27                     <associationSet>
28                         <item>
29                            <routeTableAssociationId>rtbassoc-12ad487b</routeTableAssociationId>
30                            <routeTableId>rtb-13ad487a</routeTableId>
31                            <main>true</main>
32                         </item>
33                     </associationSet>
34                    <tagSet/>
35                  </item>
36                  <item>
37                     <routeTableId>rtb-f9ad4890</routeTableId>
38                     <vpcId>vpc-11ad4878</vpcId>
39                     <routeSet>
40                        <item>
41                           <destinationCidrBlock>10.0.0.0/22</destinationCidrBlock>
42                           <gatewayId>local</gatewayId>
43                           <state>active</state>
44                           <origin>CreateRouteTable</origin>
45                        </item>
46                        <item>
47                           <destinationCidrBlock>0.0.0.0/0</destinationCidrBlock>
48                           <gatewayId>igw-eaad4883</gatewayId>
49                           <state>active</state>
50                        </item>
51                        <item>
52                            <destinationCidrBlock>10.0.0.0/21</destinationCidrBlock>
53                            <networkInterfaceId>eni-884ec1d1</networkInterfaceId>
54                            <state>blackhole</state>
55                            <origin>CreateRoute</origin>
56                        </item>
57                        <item>
58                            <destinationCidrBlock>11.0.0.0/22</destinationCidrBlock>
59                            <vpcPeeringConnectionId>pcx-efc52b86</vpcPeeringConnectionId>
60                            <state>blackhole</state>
61                            <origin>CreateRoute</origin>
62                        </item>
63                     </routeSet>
64                     <associationSet>
65                        <item>
66                            <routeTableAssociationId>rtbassoc-faad4893</routeTableAssociationId>
67                            <routeTableId>rtb-f9ad4890</routeTableId>
68                            <subnetId>subnet-15ad487c</subnetId>
69                        </item>
70                     </associationSet>
71                     <tagSet/>
72                  </item>
73               </routeTableSet>
74            </DescribeRouteTablesResponse>
75        """
76
77    def test_get_all_route_tables(self):
78        self.set_http_response(status_code=200)
79        api_response = self.service_connection.get_all_route_tables(
80            ['rtb-13ad487a', 'rtb-f9ad4890'], filters=[('route.state', 'active')])
81        self.assert_request_parameters({
82            'Action': 'DescribeRouteTables',
83            'RouteTableId.1': 'rtb-13ad487a',
84            'RouteTableId.2': 'rtb-f9ad4890',
85            'Filter.1.Name': 'route.state',
86            'Filter.1.Value.1': 'active'},
87            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
88                                  'SignatureVersion', 'Timestamp',
89                                  'Version'])
90        self.assertEquals(len(api_response), 2)
91        self.assertIsInstance(api_response[0], RouteTable)
92        self.assertEquals(api_response[0].id, 'rtb-13ad487a')
93        self.assertEquals(len(api_response[0].routes), 1)
94        self.assertEquals(api_response[0].routes[0].destination_cidr_block, '10.0.0.0/22')
95        self.assertEquals(api_response[0].routes[0].gateway_id, 'local')
96        self.assertEquals(api_response[0].routes[0].state, 'active')
97        self.assertEquals(len(api_response[0].associations), 1)
98        self.assertEquals(api_response[0].associations[0].id, 'rtbassoc-12ad487b')
99        self.assertEquals(api_response[0].associations[0].route_table_id, 'rtb-13ad487a')
100        self.assertIsNone(api_response[0].associations[0].subnet_id)
101        self.assertEquals(api_response[0].associations[0].main, True)
102        self.assertEquals(api_response[1].id, 'rtb-f9ad4890')
103        self.assertEquals(len(api_response[1].routes), 4)
104        self.assertEquals(api_response[1].routes[0].destination_cidr_block, '10.0.0.0/22')
105        self.assertEquals(api_response[1].routes[0].gateway_id, 'local')
106        self.assertEquals(api_response[1].routes[0].state, 'active')
107        self.assertEquals(api_response[1].routes[1].destination_cidr_block, '0.0.0.0/0')
108        self.assertEquals(api_response[1].routes[1].gateway_id, 'igw-eaad4883')
109        self.assertEquals(api_response[1].routes[1].state, 'active')
110        self.assertEquals(api_response[1].routes[2].destination_cidr_block, '10.0.0.0/21')
111        self.assertEquals(api_response[1].routes[2].interface_id, 'eni-884ec1d1')
112        self.assertEquals(api_response[1].routes[2].state, 'blackhole')
113        self.assertEquals(api_response[1].routes[3].destination_cidr_block, '11.0.0.0/22')
114        self.assertEquals(api_response[1].routes[3].vpc_peering_connection_id, 'pcx-efc52b86')
115        self.assertEquals(api_response[1].routes[3].state, 'blackhole')
116        self.assertEquals(len(api_response[1].associations), 1)
117        self.assertEquals(api_response[1].associations[0].id, 'rtbassoc-faad4893')
118        self.assertEquals(api_response[1].associations[0].route_table_id, 'rtb-f9ad4890')
119        self.assertEquals(api_response[1].associations[0].subnet_id, 'subnet-15ad487c')
120        self.assertEquals(api_response[1].associations[0].main, False)
121
122
123class TestAssociateRouteTable(AWSMockServiceTestCase):
124
125    connection_class = VPCConnection
126
127    def default_body(self):
128        return b"""
129            <AssociateRouteTableResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
130               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
131               <associationId>rtbassoc-f8ad4891</associationId>
132            </AssociateRouteTableResponse>
133        """
134
135    def test_associate_route_table(self):
136        self.set_http_response(status_code=200)
137        api_response = self.service_connection.associate_route_table(
138            'rtb-e4ad488d', 'subnet-15ad487c')
139        self.assert_request_parameters({
140            'Action': 'AssociateRouteTable',
141            'RouteTableId': 'rtb-e4ad488d',
142            'SubnetId': 'subnet-15ad487c'},
143            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
144                                  'SignatureVersion', 'Timestamp',
145                                  'Version'])
146        self.assertEquals(api_response, 'rtbassoc-f8ad4891')
147
148
149class TestDisassociateRouteTable(AWSMockServiceTestCase):
150
151    connection_class = VPCConnection
152
153    def default_body(self):
154        return b"""
155            <DisassociateRouteTableResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
156               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
157               <return>true</return>
158            </DisassociateRouteTableResponse>
159        """
160
161    def test_disassociate_route_table(self):
162        self.set_http_response(status_code=200)
163        api_response = self.service_connection.disassociate_route_table('rtbassoc-fdad4894')
164        self.assert_request_parameters({
165            'Action': 'DisassociateRouteTable',
166            'AssociationId': 'rtbassoc-fdad4894'},
167            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
168                                  'SignatureVersion', 'Timestamp',
169                                  'Version'])
170        self.assertEquals(api_response, True)
171
172
173class TestCreateRouteTable(AWSMockServiceTestCase):
174
175    connection_class = VPCConnection
176
177    def default_body(self):
178        return b"""
179            <CreateRouteTableResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
180               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
181               <routeTable>
182                  <routeTableId>rtb-f9ad4890</routeTableId>
183                  <vpcId>vpc-11ad4878</vpcId>
184                  <routeSet>
185                     <item>
186                        <destinationCidrBlock>10.0.0.0/22</destinationCidrBlock>
187                        <gatewayId>local</gatewayId>
188                        <state>active</state>
189                     </item>
190                  </routeSet>
191                  <associationSet/>
192                  <tagSet/>
193               </routeTable>
194            </CreateRouteTableResponse>
195        """
196
197    def test_create_route_table(self):
198        self.set_http_response(status_code=200)
199        api_response = self.service_connection.create_route_table('vpc-11ad4878')
200        self.assert_request_parameters({
201            'Action': 'CreateRouteTable',
202            'VpcId': 'vpc-11ad4878'},
203            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
204                                  'SignatureVersion', 'Timestamp',
205                                  'Version'])
206        self.assertIsInstance(api_response, RouteTable)
207        self.assertEquals(api_response.id, 'rtb-f9ad4890')
208        self.assertEquals(len(api_response.routes), 1)
209        self.assertEquals(api_response.routes[0].destination_cidr_block, '10.0.0.0/22')
210        self.assertEquals(api_response.routes[0].gateway_id, 'local')
211        self.assertEquals(api_response.routes[0].state, 'active')
212
213
214class TestDeleteRouteTable(AWSMockServiceTestCase):
215
216    connection_class = VPCConnection
217
218    def default_body(self):
219        return b"""
220            <DeleteRouteTableResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
221               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
222               <return>true</return>
223            </DeleteRouteTableResponse>
224        """
225
226    def test_delete_route_table(self):
227        self.set_http_response(status_code=200)
228        api_response = self.service_connection.delete_route_table('rtb-e4ad488d')
229        self.assert_request_parameters({
230            'Action': 'DeleteRouteTable',
231            'RouteTableId': 'rtb-e4ad488d'},
232            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
233                                  'SignatureVersion', 'Timestamp',
234                                  'Version'])
235        self.assertEquals(api_response, True)
236
237
238class TestReplaceRouteTableAssociation(AWSMockServiceTestCase):
239
240    connection_class = VPCConnection
241
242    def default_body(self):
243        return b"""
244            <ReplaceRouteTableAssociationResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
245               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
246               <newAssociationId>rtbassoc-faad4893</newAssociationId>
247            </ReplaceRouteTableAssociationResponse>
248        """
249
250    def test_replace_route_table_assocation(self):
251        self.set_http_response(status_code=200)
252        api_response = self.service_connection.replace_route_table_assocation(
253            'rtbassoc-faad4893', 'rtb-f9ad4890')
254        self.assert_request_parameters({
255            'Action': 'ReplaceRouteTableAssociation',
256            'AssociationId': 'rtbassoc-faad4893',
257            'RouteTableId': 'rtb-f9ad4890'},
258            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
259                                  'SignatureVersion', 'Timestamp',
260                                  'Version'])
261        self.assertEquals(api_response, True)
262
263    def test_replace_route_table_association_with_assoc(self):
264        self.set_http_response(status_code=200)
265        api_response = self.service_connection.replace_route_table_association_with_assoc(
266            'rtbassoc-faad4893', 'rtb-f9ad4890')
267        self.assert_request_parameters({
268            'Action': 'ReplaceRouteTableAssociation',
269            'AssociationId': 'rtbassoc-faad4893',
270            'RouteTableId': 'rtb-f9ad4890'},
271            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
272                                  'SignatureVersion', 'Timestamp',
273                                  'Version'])
274        self.assertEquals(api_response, 'rtbassoc-faad4893')
275
276
277class TestCreateRoute(AWSMockServiceTestCase):
278
279    connection_class = VPCConnection
280
281    def default_body(self):
282        return b"""
283            <CreateRouteResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
284               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
285               <return>true</return>
286            </CreateRouteResponse>
287        """
288
289    def test_create_route_gateway(self):
290        self.set_http_response(status_code=200)
291        api_response = self.service_connection.create_route(
292            'rtb-e4ad488d', '0.0.0.0/0', gateway_id='igw-eaad4883')
293        self.assert_request_parameters({
294            'Action': 'CreateRoute',
295            'RouteTableId': 'rtb-e4ad488d',
296            'DestinationCidrBlock': '0.0.0.0/0',
297            'GatewayId': 'igw-eaad4883'},
298            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
299                                  'SignatureVersion', 'Timestamp',
300                                  'Version'])
301        self.assertEquals(api_response, True)
302
303    def test_create_route_instance(self):
304        self.set_http_response(status_code=200)
305        api_response = self.service_connection.create_route(
306            'rtb-g8ff4ea2', '0.0.0.0/0', instance_id='i-1a2b3c4d')
307        self.assert_request_parameters({
308            'Action': 'CreateRoute',
309            'RouteTableId': 'rtb-g8ff4ea2',
310            'DestinationCidrBlock': '0.0.0.0/0',
311            'InstanceId': 'i-1a2b3c4d'},
312            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
313                                  'SignatureVersion', 'Timestamp',
314                                  'Version'])
315        self.assertEquals(api_response, True)
316
317    def test_create_route_interface(self):
318        self.set_http_response(status_code=200)
319        api_response = self.service_connection.create_route(
320            'rtb-g8ff4ea2', '0.0.0.0/0', interface_id='eni-1a2b3c4d')
321        self.assert_request_parameters({
322            'Action': 'CreateRoute',
323            'RouteTableId': 'rtb-g8ff4ea2',
324            'DestinationCidrBlock': '0.0.0.0/0',
325            'NetworkInterfaceId': 'eni-1a2b3c4d'},
326            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
327                                  'SignatureVersion', 'Timestamp',
328                                  'Version'])
329        self.assertEquals(api_response, True)
330
331    def test_create_route_vpc_peering_connection(self):
332        self.set_http_response(status_code=200)
333        api_response = self.service_connection.create_route(
334            'rtb-g8ff4ea2', '0.0.0.0/0', vpc_peering_connection_id='pcx-1a2b3c4d')
335        self.assert_request_parameters({
336            'Action': 'CreateRoute',
337            'RouteTableId': 'rtb-g8ff4ea2',
338            'DestinationCidrBlock': '0.0.0.0/0',
339            'VpcPeeringConnectionId': 'pcx-1a2b3c4d'},
340            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
341                                  'SignatureVersion', 'Timestamp',
342                                  'Version'])
343        self.assertEquals(api_response, True)
344
345
346class TestReplaceRoute(AWSMockServiceTestCase):
347
348    connection_class = VPCConnection
349
350    def default_body(self):
351        return b"""
352            <CreateRouteResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
353               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
354               <return>true</return>
355            </CreateRouteResponse>
356        """
357
358    def test_replace_route_gateway(self):
359        self.set_http_response(status_code=200)
360        api_response = self.service_connection.replace_route(
361            'rtb-e4ad488d', '0.0.0.0/0', gateway_id='igw-eaad4883')
362        self.assert_request_parameters({
363            'Action': 'ReplaceRoute',
364            'RouteTableId': 'rtb-e4ad488d',
365            'DestinationCidrBlock': '0.0.0.0/0',
366            'GatewayId': 'igw-eaad4883'},
367            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
368                                  'SignatureVersion', 'Timestamp',
369                                  'Version'])
370        self.assertEquals(api_response, True)
371
372    def test_replace_route_instance(self):
373        self.set_http_response(status_code=200)
374        api_response = self.service_connection.replace_route(
375            'rtb-g8ff4ea2', '0.0.0.0/0', instance_id='i-1a2b3c4d')
376        self.assert_request_parameters({
377            'Action': 'ReplaceRoute',
378            'RouteTableId': 'rtb-g8ff4ea2',
379            'DestinationCidrBlock': '0.0.0.0/0',
380            'InstanceId': 'i-1a2b3c4d'},
381            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
382                                  'SignatureVersion', 'Timestamp',
383                                  'Version'])
384        self.assertEquals(api_response, True)
385
386    def test_replace_route_interface(self):
387        self.set_http_response(status_code=200)
388        api_response = self.service_connection.replace_route(
389            'rtb-g8ff4ea2', '0.0.0.0/0', interface_id='eni-1a2b3c4d')
390        self.assert_request_parameters({
391            'Action': 'ReplaceRoute',
392            'RouteTableId': 'rtb-g8ff4ea2',
393            'DestinationCidrBlock': '0.0.0.0/0',
394            'NetworkInterfaceId': 'eni-1a2b3c4d'},
395            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
396                                  'SignatureVersion', 'Timestamp',
397                                  'Version'])
398        self.assertEquals(api_response, True)
399
400    def test_replace_route_vpc_peering_connection(self):
401        self.set_http_response(status_code=200)
402        api_response = self.service_connection.replace_route(
403            'rtb-g8ff4ea2', '0.0.0.0/0', vpc_peering_connection_id='pcx-1a2b3c4d')
404        self.assert_request_parameters({
405            'Action': 'ReplaceRoute',
406            'RouteTableId': 'rtb-g8ff4ea2',
407            'DestinationCidrBlock': '0.0.0.0/0',
408            'VpcPeeringConnectionId': 'pcx-1a2b3c4d'},
409            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
410                                  'SignatureVersion', 'Timestamp',
411                                  'Version'])
412        self.assertEquals(api_response, True)
413
414
415class TestDeleteRoute(AWSMockServiceTestCase):
416
417    connection_class = VPCConnection
418
419    def default_body(self):
420        return b"""
421            <DeleteRouteTableResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-01/">
422               <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
423               <return>true</return>
424            </DeleteRouteTableResponse>
425        """
426
427    def test_delete_route(self):
428        self.set_http_response(status_code=200)
429        api_response = self.service_connection.delete_route('rtb-e4ad488d', '172.16.1.0/24')
430        self.assert_request_parameters({
431            'Action': 'DeleteRoute',
432            'RouteTableId': 'rtb-e4ad488d',
433            'DestinationCidrBlock': '172.16.1.0/24'},
434            ignore_params_values=['AWSAccessKeyId', 'SignatureMethod',
435                                  'SignatureVersion', 'Timestamp',
436                                  'Version'])
437        self.assertEquals(api_response, True)
438
439if __name__ == '__main__':
440    unittest.main()
441