source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/utils.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 13.7 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#pylint: disable=invalid-name,redefined-builtin
4
5"""
6| This file is part of the web2py Web Framework
7| Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu>
8| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
9
10This file specifically includes utilities for security.
11--------------------------------------------------------
12"""
13
14import threading
15import struct
16import uuid
17import random
18import inspect
19import time
20import os
21import sys
22import re
23import logging
24import socket
25import base64
26import zlib
27import hashlib
28import hmac
29from gluon._compat import basestring, pickle, PY2, xrange, to_bytes, to_native
30
31_struct_2_long_long = struct.Struct('=QQ')
32
33try:
34    from Crypto.Cipher import AES
35    HAVE_AES = True
36except ImportError:
37    import gluon.contrib.pyaes as PYAES
38    HAVE_AES = False
39
40
41HAVE_COMPARE_DIGEST = False
42if hasattr(hmac, 'compare_digest'):
43    HAVE_COMPARE_DIGEST = True
44
45logger = logging.getLogger("web2py")
46
47
48def AES_new(key, IV=None):
49    """Return an AES cipher object and random IV if None specified."""
50    if IV is None:
51        IV = fast_urandom16()
52    if HAVE_AES:
53        return AES.new(key, AES.MODE_CBC, IV), IV
54    else:
55        return PYAES.AESModeOfOperationCBC(key, iv=IV), IV
56
57
58def AES_enc(cipher, data):
59    """Encrypt data with the cipher."""
60    if HAVE_AES:
61        return cipher.encrypt(data)
62    else:
63        encrypter = PYAES.Encrypter(cipher)
64        enc = encrypter.feed(data)
65        enc += encrypter.feed()
66        return enc
67
68
69def AES_dec(cipher, data):
70    """Decrypt data with the cipher."""
71    if HAVE_AES:
72        return cipher.decrypt(data)
73    else:
74        decrypter = PYAES.Decrypter(cipher)
75        dec = decrypter.feed(data)
76        dec += decrypter.feed()
77        return dec
78
79
80def compare(a, b):
81    """ Compares two strings and not vulnerable to timing attacks """
82    if HAVE_COMPARE_DIGEST:
83        return hmac.compare_digest(a, b)
84    result = len(a) ^ len(b)
85    for i in xrange(len(b)):
86        result |= ord(a[i % len(a)]) ^ ord(b[i])
87    return result == 0
88
89
90def md5_hash(text):
91    """Generate an md5 hash with the given text."""
92    return hashlib.md5(to_bytes(text)).hexdigest()
93
94
95def get_callable_argspec(fn):
96    if inspect.isfunction(fn) or inspect.ismethod(fn):
97        inspectable = fn
98    elif inspect.isclass(fn):
99        inspectable = fn.__init__
100    elif hasattr(fn, '__call__'):
101        inspectable = fn.__call__
102    else:
103        inspectable = fn
104    return inspect.getargspec(inspectable)
105
106
107def pad(s, n=32):
108    """does padding according to PKCS7v1.5 https://www.ietf.org/rfc/rfc2315.txt"""
109    padlen = n - len(s) % n
110    return s + bytes(bytearray(padlen * [padlen]))
111
112
113def unpad(s, n=32):
114    """removed padding"""
115    padlen = s[-1]
116    if isinstance(padlen, str):
117        padlen = ord(padlen)  # python2
118    if (padlen < 1) | (padlen > n):  # avoid short-circuit
119        # return garbage to minimize side channels
120        return bytes(bytearray(len(s) * [0]))
121    return s[:-padlen]
122
123
124def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
125    """dumps data, followed by a signature"""
126    dump = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
127    if compression_level:
128        dump = zlib.compress(dump, compression_level)
129    encryption_key = to_bytes(encryption_key)
130    if not hash_key:
131        hash_key = hashlib.sha256(encryption_key).digest()
132    cipher, IV = AES_new(pad(encryption_key)[:32])
133    encrypted_data = base64.urlsafe_b64encode(IV + AES_enc(cipher, pad(dump)))
134    signature = to_bytes(hmac.new(to_bytes(hash_key), encrypted_data, hashlib.sha256).hexdigest())
135    return b'hmac256:' + signature + b':' + encrypted_data
136
137
138def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
139    """loads a signed data dump"""
140    data = to_bytes(data)
141    components = data.count(b':')
142    if components == 1:
143        return secure_loads_deprecated(data, encryption_key, hash_key, compression_level)
144    if components != 2:
145        return None
146    version, signature, encrypted_data = data.split(b':', 2)
147    if version != b'hmac256':
148        return None
149    encryption_key = to_bytes(encryption_key)
150    if not hash_key:
151        hash_key = hashlib.sha256(encryption_key).digest()
152    actual_signature = hmac.new(to_bytes(hash_key), encrypted_data, hashlib.sha256).hexdigest()
153    if not compare(to_native(signature), actual_signature):
154        return None
155    encrypted_data = base64.urlsafe_b64decode(encrypted_data)
156    IV, encrypted_data = encrypted_data[:16], encrypted_data[16:]
157    cipher, _ = AES_new(pad(encryption_key)[:32], IV=IV)
158    try:
159        data = unpad(AES_dec(cipher, encrypted_data))
160        if compression_level:
161            data = zlib.decompress(data)
162        return pickle.loads(data)
163    except Exception:
164        return None
165
166
167def __pad_deprecated(s, n=32, padchar=b' '):
168    """reprecated data, here for backward compatibility"""
169    return s + (n - len(s) % n) * padchar
170
171
172def secure_dumps_deprecated(data, encryption_key, hash_key=None, compression_level=None):
173    """dumps data with a signature (deprecated because of incorrect padding)"""
174    encryption_key = to_bytes(encryption_key)
175    if not hash_key:
176        hash_key = hashlib.sha1(encryption_key).hexdigest()
177    dump = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
178    if compression_level:
179        dump = zlib.compress(dump, compression_level)
180    key = __pad_deprecated(encryption_key)[:32]
181    cipher, IV = AES_new(key)
182    encrypted_data = base64.urlsafe_b64encode(IV + AES_enc(cipher, pad(dump)))
183    signature = to_bytes(hmac.new(to_bytes(hash_key), encrypted_data, hashlib.md5).hexdigest())
184    return signature + b':' + encrypted_data
185
186
187def secure_loads_deprecated(data, encryption_key, hash_key=None, compression_level=None):
188    """loads signed data (deprecated because of incorrect padding)"""
189    encryption_key = to_bytes(encryption_key)
190    data = to_native(data)
191    if ':' not in data:
192        return None
193    if not hash_key:
194        hash_key = hashlib.sha1(encryption_key).hexdigest()
195    signature, encrypted_data = data.split(':', 1)
196    encrypted_data = to_bytes(encrypted_data)
197    actual_signature = hmac.new(to_bytes(hash_key), encrypted_data, hashlib.md5).hexdigest()
198    if not compare(signature, actual_signature):
199        return None
200    key = __pad_deprecated(encryption_key)[:32]
201    encrypted_data = base64.urlsafe_b64decode(encrypted_data)
202    IV, encrypted_data = encrypted_data[:16], encrypted_data[16:]
203    cipher, _ = AES_new(key, IV=IV)
204    try:
205        data = AES_dec(cipher, encrypted_data)
206        data = data.rstrip(b' ')
207        if compression_level:
208            data = zlib.decompress(data)
209        return pickle.loads(data)
210    except Exception:
211        return None
212
213### compute constant CTOKENS
214
215
216def initialize_urandom():
217    """
218    This function and the web2py_uuid follow from the following discussion:
219    `http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
220
221    At startup web2py compute a unique ID that identifies the machine by adding
222    uuid.getnode() + int(time.time() * 1e3)
223
224    This is a 48-bit number. It converts the number into 16 8-bit tokens.
225    It uses this value to initialize the entropy source ('/dev/urandom') and to seed random.
226
227    If os.random() is not supported, it falls back to using random and issues a warning.
228    """
229    node_id = uuid.getnode()
230    microseconds = int(time.time() * 1e6)
231    ctokens = [((node_id + microseconds) >> ((i % 6) * 8)) %
232               256 for i in range(16)]
233    random.seed(node_id + microseconds)
234    try:
235        os.urandom(1)
236        have_urandom = True
237        if sys.platform != 'win32':
238            try:
239                # try to add process-specific entropy
240                frandom = open('/dev/urandom', 'wb')
241                try:
242                    if PY2:
243                        frandom.write(''.join(chr(t) for t in ctokens))
244                    else:
245                        frandom.write(bytes([]).join(bytes([t]) for t in ctokens))
246                finally:
247                    frandom.close()
248            except IOError:
249                # works anyway
250                pass
251    except NotImplementedError:
252        have_urandom = False
253        logger.warning(
254            """Cryptographically secure session management is not possible on your system because
255your system does not provide a cryptographically secure entropy source.
256This is not specific to web2py; consider deploying on a different operating system.""")
257    if PY2:
258        packed = ''.join(chr(x) for x in ctokens)
259    else:
260        packed = bytes([]).join(bytes([x]) for x in ctokens)
261    unpacked_ctokens = _struct_2_long_long.unpack(packed)
262    return unpacked_ctokens, have_urandom
263UNPACKED_CTOKENS, HAVE_URANDOM = initialize_urandom()
264
265
266def fast_urandom16(urandom=[], locker=threading.RLock()):
267    """
268    This is 4x faster than calling os.urandom(16) and prevents
269    the "too many files open" issue with concurrent access to os.urandom()
270    """
271    try:
272        return urandom.pop()
273    except IndexError:
274        try:
275            locker.acquire()
276            ur = os.urandom(16 * 1024)
277            urandom += [ur[i:i + 16] for i in xrange(16, 1024 * 16, 16)]
278            return ur[0:16]
279        finally:
280            locker.release()
281
282
283def web2py_uuid(ctokens=UNPACKED_CTOKENS):
284    """
285    This function follows from the following discussion:
286    `http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
287
288    It works like uuid.uuid4 except that tries to use os.urandom() if possible
289    and it XORs the output with the tokens uniquely associated with this machine.
290    """
291    rand_longs = (random.getrandbits(64), random.getrandbits(64))
292    if HAVE_URANDOM:
293        urand_longs = _struct_2_long_long.unpack(fast_urandom16())
294        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
295                                          rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
296    else:
297        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
298                                          rand_longs[1] ^ ctokens[1])
299    return str(uuid.UUID(bytes=byte_s, version=4))
300
301REGEX_IPv4 = re.compile(r'(\d+)\.(\d+)\.(\d+)\.(\d+)')
302
303
304def is_valid_ip_address(address):
305    """
306    Examples:
307        Better than a thousand words::
308
309            >>> is_valid_ip_address('127.0')
310            False
311            >>> is_valid_ip_address('127.0.0.1')
312            True
313            >>> is_valid_ip_address('2001:660::1')
314            True
315    """
316    # deal with special cases
317    if address.lower() in ('127.0.0.1', 'localhost', '::1', '::ffff:127.0.0.1'):
318        return True
319    elif address.lower() in ('unknown', ''):
320        return False
321    elif address.count('.') == 3:  # assume IPv4
322        if address.startswith('::ffff:'):
323            address = address[7:]
324        if hasattr(socket, 'inet_aton'):  # try validate using the OS
325            try:
326                socket.inet_aton(address)
327                return True
328            except socket.error:  # invalid address
329                return False
330        else:  # try validate using Regex
331            match = REGEX_IPv4.match(address)
332            if match and all(0 <= int(match.group(i)) < 256 for i in (1, 2, 3, 4)):
333                return True
334            return False
335    elif hasattr(socket, 'inet_pton'):  # assume IPv6, try using the OS
336        try:
337            socket.inet_pton(socket.AF_INET6, address)
338            return True
339        except socket.error:  # invalid address
340            return False
341    else:  # do not know what to do? assume it is a valid address
342        return True
343
344
345def is_loopback_ip_address(ip=None, addrinfo=None):
346    """
347    Determines whether the address appears to be a loopback address.
348    This assumes that the IP is valid.
349    """
350    if addrinfo:  # see socket.getaddrinfo() for layout of addrinfo tuple
351        if addrinfo[0] == socket.AF_INET or addrinfo[0] == socket.AF_INET6:
352            ip = addrinfo[4]
353    if not isinstance(ip, basestring):
354        return False
355    # IPv4 or IPv6-embedded IPv4 or IPv4-compatible IPv6
356    if ip.count('.') == 3:
357        return ip.lower().startswith(('127', '::127', '0:0:0:0:0:0:127',
358                                      '::ffff:127', '0:0:0:0:0:ffff:127'))
359    return ip == '::1' or ip == '0:0:0:0:0:0:0:1'   # IPv6 loopback
360
361
362def getipaddrinfo(host):
363    """
364    Filter out non-IP and bad IP addresses from getaddrinfo
365    """
366    try:
367        return [addrinfo for addrinfo in socket.getaddrinfo(host, None)
368                if (addrinfo[0] == socket.AF_INET or
369                    addrinfo[0] == socket.AF_INET6)
370                and isinstance(addrinfo[4][0], basestring)]
371    except socket.error:
372        return []
373
374
375def unlocalised_http_header_date(data):
376    """
377    Converts input datetime to format defined by RFC 7231, section 7.1.1.1
378
379    Previously, %a and %b formats were used for weekday and month names, but
380    those are not locale-safe. uWSGI requires latin1-encodable headers and
381    for example in cs_CS locale, fourth day in week is not encodable in latin1,
382    as it's "Čt".
383
384    Example output: Sun, 06 Nov 1994 08:49:37 GMT
385    """
386
387    short_weekday = {
388        "0": "Sun",
389        "1": "Mon",
390        "2": "Tue",
391        "3": "Wed",
392        "4": "Thu",
393        "5": "Fri",
394        "6": "Sat",
395    }.get(time.strftime("%w", data))
396
397    day_of_month = time.strftime("%d", data)
398
399    short_month = {
400        "01": "Jan",
401        "02": "Feb",
402        "03": "Mar",
403        "04": "Apr",
404        "05": "May",
405        "06": "Jun",
406        "07": "Jul",
407        "08": "Aug",
408        "09": "Sep",
409        "10": "Oct",
410        "11": "Nov",
411        "12": "Dec",
412    }.get(time.strftime("%m", data))
413
414    year_and_time = time.strftime("%Y %H:%M:%S GMT", data)
415
416    return "{}, {} {} {}".format(
417        short_weekday,
418        day_of_month,
419        short_month,
420        year_and_time)
Note: See TracBrowser for help on using the repository browser.