1# Copyright (C) 2001-2007, 2009, 2010 Nominum, Inc.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose with or without fee is hereby granted,
5# provided that the above copyright notice and this permission notice
6# appear in all copies.
7#
8# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16"""DNS TSIG support."""
17
18import hmac
19import struct
20
21import dns.exception
22import dns.rdataclass
23import dns.name
24
25class BadTime(dns.exception.DNSException):
26    """Raised if the current time is not within the TSIG's validity time."""
27    pass
28
29class BadSignature(dns.exception.DNSException):
30    """Raised if the TSIG signature fails to verify."""
31    pass
32
33class PeerError(dns.exception.DNSException):
34    """Base class for all TSIG errors generated by the remote peer"""
35    pass
36
37class PeerBadKey(PeerError):
38    """Raised if the peer didn't know the key we used"""
39    pass
40
41class PeerBadSignature(PeerError):
42    """Raised if the peer didn't like the signature we sent"""
43    pass
44
45class PeerBadTime(PeerError):
46    """Raised if the peer didn't like the time we sent"""
47    pass
48
49class PeerBadTruncation(PeerError):
50    """Raised if the peer didn't like amount of truncation in the TSIG we sent"""
51    pass
52
53default_algorithm = "HMAC-MD5.SIG-ALG.REG.INT"
54
55BADSIG = 16
56BADKEY = 17
57BADTIME = 18
58BADTRUNC = 22
59
60def sign(wire, keyname, secret, time, fudge, original_id, error,
61         other_data, request_mac, ctx=None, multi=False, first=True,
62         algorithm=default_algorithm):
63    """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata
64    for the input parameters, the HMAC MAC calculated by applying the
65    TSIG signature algorithm, and the TSIG digest context.
66    @rtype: (string, string, hmac.HMAC object)
67    @raises ValueError: I{other_data} is too long
68    @raises NotImplementedError: I{algorithm} is not supported
69    """
70
71    (algorithm_name, digestmod) = get_algorithm(algorithm)
72    if first:
73        ctx = hmac.new(secret, digestmod=digestmod)
74        ml = len(request_mac)
75        if ml > 0:
76            ctx.update(struct.pack('!H', ml))
77            ctx.update(request_mac)
78    id = struct.pack('!H', original_id)
79    ctx.update(id)
80    ctx.update(wire[2:])
81    if first:
82        ctx.update(keyname.to_digestable())
83        ctx.update(struct.pack('!H', dns.rdataclass.ANY))
84        ctx.update(struct.pack('!I', 0))
85    long_time = time + 0L
86    upper_time = (long_time >> 32) & 0xffffL
87    lower_time = long_time & 0xffffffffL
88    time_mac = struct.pack('!HIH', upper_time, lower_time, fudge)
89    pre_mac = algorithm_name + time_mac
90    ol = len(other_data)
91    if ol > 65535:
92        raise ValueError('TSIG Other Data is > 65535 bytes')
93    post_mac = struct.pack('!HH', error, ol) + other_data
94    if first:
95        ctx.update(pre_mac)
96        ctx.update(post_mac)
97    else:
98        ctx.update(time_mac)
99    mac = ctx.digest()
100    mpack = struct.pack('!H', len(mac))
101    tsig_rdata = pre_mac + mpack + mac + id + post_mac
102    if multi:
103        ctx = hmac.new(secret)
104        ml = len(mac)
105        ctx.update(struct.pack('!H', ml))
106        ctx.update(mac)
107    else:
108        ctx = None
109    return (tsig_rdata, mac, ctx)
110
111def hmac_md5(wire, keyname, secret, time, fudge, original_id, error,
112             other_data, request_mac, ctx=None, multi=False, first=True,
113             algorithm=default_algorithm):
114    return sign(wire, keyname, secret, time, fudge, original_id, error,
115                other_data, request_mac, ctx, multi, first, algorithm)
116
117def validate(wire, keyname, secret, now, request_mac, tsig_start, tsig_rdata,
118             tsig_rdlen, ctx=None, multi=False, first=True):
119    """Validate the specified TSIG rdata against the other input parameters.
120
121    @raises FormError: The TSIG is badly formed.
122    @raises BadTime: There is too much time skew between the client and the
123    server.
124    @raises BadSignature: The TSIG signature did not validate
125    @rtype: hmac.HMAC object"""
126
127    (adcount,) = struct.unpack("!H", wire[10:12])
128    if adcount == 0:
129        raise dns.exception.FormError
130    adcount -= 1
131    new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start]
132    current = tsig_rdata
133    (aname, used) = dns.name.from_wire(wire, current)
134    current = current + used
135    (upper_time, lower_time, fudge, mac_size) = \
136                 struct.unpack("!HIHH", wire[current:current + 10])
137    time = ((upper_time + 0L) << 32) + (lower_time + 0L)
138    current += 10
139    mac = wire[current:current + mac_size]
140    current += mac_size
141    (original_id, error, other_size) = \
142                  struct.unpack("!HHH", wire[current:current + 6])
143    current += 6
144    other_data = wire[current:current + other_size]
145    current += other_size
146    if current != tsig_rdata + tsig_rdlen:
147        raise dns.exception.FormError
148    if error != 0:
149        if error == BADSIG:
150            raise PeerBadSignature
151        elif error == BADKEY:
152            raise PeerBadKey
153        elif error == BADTIME:
154            raise PeerBadTime
155        elif error == BADTRUNC:
156            raise PeerBadTruncation
157        else:
158            raise PeerError('unknown TSIG error code %d' % error)
159    time_low = time - fudge
160    time_high = time + fudge
161    if now < time_low or now > time_high:
162        raise BadTime
163    (junk, our_mac, ctx) = sign(new_wire, keyname, secret, time, fudge,
164                                original_id, error, other_data,
165                                request_mac, ctx, multi, first, aname)
166    if (our_mac != mac):
167        raise BadSignature
168    return ctx
169
170def get_algorithm(algorithm):
171    """Returns the wire format string and the hash module to use for the
172    specified TSIG algorithm
173
174    @rtype: (string, hash constructor)
175    @raises NotImplementedError: I{algorithm} is not supported
176    """
177
178    hashes = {}
179    try:
180        import hashlib
181        hashes[dns.name.from_text('hmac-sha224')] = hashlib.sha224
182        hashes[dns.name.from_text('hmac-sha256')] = hashlib.sha256
183        hashes[dns.name.from_text('hmac-sha384')] = hashlib.sha384
184        hashes[dns.name.from_text('hmac-sha512')] = hashlib.sha512
185        hashes[dns.name.from_text('hmac-sha1')] = hashlib.sha1
186        hashes[dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = hashlib.md5
187
188        import sys
189        if sys.hexversion < 0x02050000:
190            # hashlib doesn't conform to PEP 247: API for
191            # Cryptographic Hash Functions, which hmac before python
192            # 2.5 requires, so add the necessary items.
193            class HashlibWrapper:
194                def __init__(self, basehash):
195                    self.basehash = basehash
196                    self.digest_size = self.basehash().digest_size
197
198                def new(self, *args, **kwargs):
199                    return self.basehash(*args, **kwargs)
200
201            for name in hashes:
202                hashes[name] = HashlibWrapper(hashes[name])
203
204    except ImportError:
205        import md5, sha
206        hashes[dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] =  md5.md5
207        hashes[dns.name.from_text('hmac-sha1')] = sha.sha
208
209    if isinstance(algorithm, (str, unicode)):
210        algorithm = dns.name.from_text(algorithm)
211
212    if algorithm in hashes:
213        return (algorithm.to_digestable(), hashes[algorithm])
214
215    raise NotImplementedError("TSIG algorithm " + str(algorithm) +
216                              " is not supported")
217