1#
2# Copyright (c) 2011 Thomas Graf <tgraf@suug.ch>
3#
4
5"""Module providing access to network addresses
6"""
7
8from __future__ import absolute_import
9
10
11__version__ = '1.0'
12__all__ = [
13    'AddressCache',
14    'Address']
15
16import datetime
17from .. import core as netlink
18from .  import capi as capi
19from .  import link as Link
20from .. import util as util
21
22class AddressCache(netlink.Cache):
23    """Cache containing network addresses"""
24
25    def __init__(self, cache=None):
26        if not cache:
27            cache = self._alloc_cache_name('route/addr')
28
29        self._protocol = netlink.NETLINK_ROUTE
30        self._nl_cache = cache
31
32    def __getitem__(self, key):
33        # Using ifindex=0 here implies that the local address itself
34        # is unique, otherwise the first occurence is returned.
35        return self.lookup(0, key)
36
37    def lookup(self, ifindex, local):
38        if type(local) is str:
39            local = netlink.AbstractAddress(local)
40
41        addr = capi.rtnl_addr_get(self._nl_cache, ifindex,
42                      local._nl_addr)
43        if addr is None:
44            raise KeyError()
45
46        return Address._from_capi(addr)
47
48    @staticmethod
49    def _new_object(obj):
50        return Address(obj)
51
52    @staticmethod
53    def _new_cache(cache):
54        return AddressCache(cache=cache)
55
56class Address(netlink.Object):
57    """Network address"""
58
59    def __init__(self, obj=None):
60        netlink.Object.__init__(self, 'route/addr', 'address', obj)
61        self._rtnl_addr = self._obj2type(self._nl_object)
62
63    @classmethod
64    def _from_capi(cls, obj):
65        return cls(capi.addr2obj(obj))
66
67    @staticmethod
68    def _obj2type(obj):
69        return capi.obj2addr(obj)
70
71    def __cmp__(self, other):
72        # sort by:
73        #    1. network link
74        #    2. address family
75        #    3. local address (including prefixlen)
76        diff = self.ifindex - other.ifindex
77
78        if diff == 0:
79            diff = self.family - other.family
80            if diff == 0:
81                diff = capi.nl_addr_cmp(self.local, other.local)
82
83        return diff
84
85    @staticmethod
86    def _new_instance(obj):
87        return Address(obj)
88
89    @property
90    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
91    def ifindex(self):
92        """interface index"""
93        return capi.rtnl_addr_get_ifindex(self._rtnl_addr)
94
95    @ifindex.setter
96    def ifindex(self, value):
97        link = Link.resolve(value)
98        if not link:
99            raise ValueError()
100
101        self.link = link
102
103    @property
104    @netlink.nlattr(type=str, fmt=util.string)
105    def link(self):
106        link = capi.rtnl_addr_get_link(self._rtnl_addr)
107        if not link:
108            return None
109
110        return Link.Link.from_capi(link)
111
112    @link.setter
113    def link(self, value):
114        if type(value) is str:
115            try:
116                value = Link.resolve(value)
117            except KeyError:
118                raise ValueError()
119
120        capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)
121
122        # ifindex is immutable but we assume that if _orig does not
123        # have an ifindex specified, it was meant to be given here
124        if capi.rtnl_addr_get_ifindex(self._orig) == 0:
125            capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
126
127    @property
128    @netlink.nlattr(type=str, fmt=util.string)
129    def label(self):
130        """address label"""
131        return capi.rtnl_addr_get_label(self._rtnl_addr)
132
133    @label.setter
134    def label(self, value):
135        capi.rtnl_addr_set_label(self._rtnl_addr, value)
136
137    @property
138    @netlink.nlattr(type=str, fmt=util.string)
139    def flags(self):
140        """Flags
141
142        Setting this property will *Not* reset flags to value you supply in
143
144        Examples:
145        addr.flags = '+xxx' # add xxx flag
146        addr.flags = 'xxx'  # exactly the same
147        addr.flags = '-xxx' # remove xxx flag
148        addr.flags = [ '+xxx', '-yyy' ] # list operation
149        """
150        flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
151        return capi.rtnl_addr_flags2str(flags, 256)[0].split(',')
152
153    def _set_flag(self, flag):
154        if flag.startswith('-'):
155            i = capi.rtnl_addr_str2flags(flag[1:])
156            capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
157        elif flag.startswith('+'):
158            i = capi.rtnl_addr_str2flags(flag[1:])
159            capi.rtnl_addr_set_flags(self._rtnl_addr, i)
160        else:
161            i = capi.rtnl_addr_str2flags(flag)
162            capi.rtnl_addr_set_flags(self._rtnl_addr, i)
163
164    @flags.setter
165    def flags(self, value):
166        if type(value) is list:
167            for flag in value:
168                self._set_flag(flag)
169        else:
170            self._set_flag(value)
171
172    @property
173    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
174    def family(self):
175        """Address family"""
176        fam = capi.rtnl_addr_get_family(self._rtnl_addr)
177        return netlink.AddressFamily(fam)
178
179    @family.setter
180    def family(self, value):
181        if not isinstance(value, netlink.AddressFamily):
182            value = netlink.AddressFamily(value)
183
184        capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
185
186    @property
187    @netlink.nlattr(type=int, fmt=util.num)
188    def scope(self):
189        """Address scope"""
190        scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
191        return capi.rtnl_scope2str(scope, 32)[0]
192
193    @scope.setter
194    def scope(self, value):
195        if type(value) is str:
196            value = capi.rtnl_str2scope(value)
197        capi.rtnl_addr_set_scope(self._rtnl_addr, value)
198
199    @property
200    @netlink.nlattr(type=str, immutable=True, fmt=util.addr)
201    def local(self):
202        """Local address"""
203        a = capi.rtnl_addr_get_local(self._rtnl_addr)
204        return netlink.AbstractAddress(a)
205
206    @local.setter
207    def local(self, value):
208        a = netlink.AbstractAddress(value)
209        capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)
210
211        # local is immutable but we assume that if _orig does not
212        # have a local address specified, it was meant to be given here
213        if capi.rtnl_addr_get_local(self._orig) is None:
214            capi.rtnl_addr_set_local(self._orig, a._nl_addr)
215
216    @property
217    @netlink.nlattr(type=str, fmt=util.addr)
218    def peer(self):
219        """Peer address"""
220        a = capi.rtnl_addr_get_peer(self._rtnl_addr)
221        return netlink.AbstractAddress(a)
222
223    @peer.setter
224    def peer(self, value):
225        a = netlink.AbstractAddress(value)
226        capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
227
228    @property
229    @netlink.nlattr(type=str, fmt=util.addr)
230    def broadcast(self):
231        """Broadcast address"""
232        a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
233        return netlink.AbstractAddress(a)
234
235    @broadcast.setter
236    def broadcast(self, value):
237        a = netlink.AbstractAddress(value)
238        capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
239
240    @property
241    @netlink.nlattr(type=str, fmt=util.addr)
242    def multicast(self):
243        """multicast address"""
244        a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
245        return netlink.AbstractAddress(a)
246
247    @multicast.setter
248    def multicast(self, value):
249        try:
250            a = netlink.AbstractAddress(value)
251        except ValueError as err:
252            raise AttributeError('multicast', err)
253
254        capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
255
256    @property
257    @netlink.nlattr(type=str, fmt=util.addr)
258    def anycast(self):
259        """anycast address"""
260        a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
261        return netlink.AbstractAddress(a)
262
263    @anycast.setter
264    def anycast(self, value):
265        a = netlink.AbstractAddress(value)
266        capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
267
268    @property
269    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
270    def valid_lifetime(self):
271        """Valid lifetime"""
272        msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
273        if msecs == 0xFFFFFFFF:
274            return None
275        else:
276            return datetime.timedelta(seconds=msecs)
277
278    @valid_lifetime.setter
279    def valid_lifetime(self, value):
280        capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
281
282    @property
283    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
284    def preferred_lifetime(self):
285        """Preferred lifetime"""
286        msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
287        if msecs == 0xFFFFFFFF:
288            return None
289        else:
290            return datetime.timedelta(seconds=msecs)
291
292    @preferred_lifetime.setter
293    def preferred_lifetime(self, value):
294        capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
295
296    @property
297    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
298    def create_time(self):
299        """Creation time"""
300        hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
301        return datetime.timedelta(milliseconds=10*hsec)
302
303    @property
304    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
305    def last_update(self):
306        """Last update"""
307        hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
308        return datetime.timedelta(milliseconds=10*hsec)
309
310    def add(self, socket=None, flags=None):
311        if not socket:
312            socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
313
314        if not flags:
315            flags = netlink.NLM_F_CREATE
316
317        ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
318        if ret < 0:
319            raise netlink.KernelError(ret)
320
321    def delete(self, socket, flags=0):
322        """Attempt to delete this address in the kernel"""
323        ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
324        if ret < 0:
325            raise netlink.KernelError(ret)
326
327    ###################################################################
328    # private properties
329    #
330    # Used for formatting output. USE AT OWN RISK
331    @property
332    def _flags(self):
333        return ','.join(self.flags)
334
335    def format(self, details=False, stats=False, nodev=False, indent=''):
336        """Return address as formatted text"""
337        fmt = util.MyFormatter(self, indent)
338
339        buf = fmt.format('{a|local!b}')
340
341        if not nodev:
342            buf += fmt.format(' {a|ifindex}')
343
344        buf += fmt.format(' {a|scope}')
345
346        if self.label:
347            buf += fmt.format(' "{a|label}"')
348
349        buf += fmt.format(' <{a|_flags}>')
350
351        if details:
352            buf += fmt.nl('\t{t|broadcast} {t|multicast}') \
353                 + fmt.nl('\t{t|peer} {t|anycast}')
354
355            if self.valid_lifetime:
356                buf += fmt.nl('\t{s|valid-lifetime!k} '\
357                       '{a|valid_lifetime}')
358
359            if self.preferred_lifetime:
360                buf += fmt.nl('\t{s|preferred-lifetime!k} '\
361                       '{a|preferred_lifetime}')
362
363        if stats and (self.create_time or self.last_update):
364            buf += self.nl('\t{s|created!k} {a|create_time}'\
365                   ' {s|last-updated!k} {a|last_update}')
366
367        return buf
368