source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/contrib/pysimplesoap/wsse.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 10.1 KB
Line 
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published by
5# the Free Software Foundation; either version 3, or (at your option) any
6# later version.
7#
8# This program is distributed in the hope that it will be useful, but
9# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11# for more details.
12
13"""Pythonic simple SOAP Client plugins for WebService Security extensions"""
14
15
16from __future__ import unicode_literals
17import sys
18if sys.version > '3':
19    basestring = unicode = str
20
21import datetime
22from decimal import Decimal
23import os
24import logging
25import hashlib
26import warnings
27
28from . import __author__, __copyright__, __license__, __version__
29from .simplexml import SimpleXMLElement
30
31import random
32import string
33from hashlib import sha1
34
35def randombytes(N):
36    return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(N))
37
38# Namespaces:
39
40WSSE_URI = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
41WSU_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
42XMLDSIG_URI = "http://www.w3.org/2000/09/xmldsig#"
43X509v3_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"
44Base64Binary_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary"
45PasswordDigest_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest"
46
47
48class UsernameToken:
49    "WebService Security extension to add a basic credentials to xml request"
50
51    def __init__(self, username="", password=""):
52        self.token = {
53            'wsse:UsernameToken': {
54                'wsse:Username': username,
55                'wsse:Password': password,
56                }
57            }
58
59    def preprocess(self, client, request, method, args, kwargs, headers, soap_uri):
60        "Add basic credentials to outgoing message"
61        # always extract WS Security header and send it
62        header = request('Header', ns=soap_uri, )
63        k = 'wsse:Security'
64        # for backward compatibility, use header if given:
65        if k in headers:
66            self.token = headers[k]
67        # convert the token to xml
68        header.marshall(k, self.token, ns=False, add_children_ns=False)
69        header(k)['xmlns:wsse'] = WSSE_URI
70        #<wsse:UsernameToken xmlns:wsu='http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd'>
71
72    def postprocess(self, client, response, method, args, kwargs, headers, soap_uri):
73        "Analyze incoming credentials"
74        # TODO: add some password validation callback?
75        pass
76
77class UsernameDigestToken(UsernameToken):
78    """
79    WebService Security extension to add a http digest credentials to xml request
80    drift -> time difference from the server in seconds, needed for 'Created' header
81    """
82
83    def __init__(self, username="", password="", drift=0):
84        self.username = username
85        self.password = password
86        self.drift = datetime.timedelta(seconds=drift)
87
88    def preprocess(self, client, request, method, args, kwargs, headers, soap_uri):
89        header = request('Header', ns=soap_uri, )
90        wsse = header.add_child('wsse:Security', ns=False)
91        wsse['xmlns:wsse'] = WSSE_URI
92        wsse['xmlns:wsu'] = WSU_URI
93
94        usertoken = wsse.add_child('wsse:UsernameToken', ns=False)
95        usertoken.add_child('wsse:Username', self.username, ns=False)
96
97        created = (datetime.datetime.utcnow() + self.drift).isoformat() + 'Z'
98        usertoken.add_child('wsu:Created', created, ns=False)
99
100        nonce = randombytes(16)
101        wssenonce = usertoken.add_child('wsse:Nonce', nonce.encode('base64')[:-1], ns=False)
102        wssenonce['EncodingType'] = Base64Binary_URI
103
104        sha1obj = sha1()
105        sha1obj.update(nonce + created + self.password)
106        digest = sha1obj.digest()
107        password = usertoken.add_child('wsse:Password', digest.encode('base64')[:-1], ns=False)
108        password['Type'] = PasswordDigest_URI
109
110
111BIN_TOKEN_TMPL = """<?xml version="1.0" encoding="UTF-8"?>
112<wsse:Security soapenv:mustUnderstand="1" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
113    <wsse:BinarySecurityToken EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3" wsu:Id="CertId-45851B081998E431E8132880700036719" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
114%(certificate)s</wsse:BinarySecurityToken>
115    <ds:Signature Id="Signature-13" xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
116        %(signed_info)s
117        <ds:SignatureValue>%(signature_value)s</ds:SignatureValue>
118        <ds:KeyInfo Id="KeyId-45851B081998E431E8132880700036720">
119            <wsse:SecurityTokenReference wsu:Id="STRId-45851B081998E431E8132880700036821" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
120                <wsse:Reference URI="#CertId-45851B081998E431E8132880700036719" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"/>
121            </wsse:SecurityTokenReference>
122        </ds:KeyInfo>
123    </ds:Signature>
124</wsse:Security>
125"""
126
127class BinaryTokenSignature:
128    "WebService Security extension to add a basic signature to xml request"
129
130    def __init__(self, certificate="", private_key="", password=None, cacert=None):
131        # read the X509v3 certificate (PEM)
132        self.certificate = ''.join([line for line in open(certificate)
133                                         if not line.startswith("---")])
134        self.private_key = private_key
135        self.password = password
136        self.cacert = cacert
137
138    def preprocess(self, client, request, method, args, kwargs, headers, soap_uri):
139        "Sign the outgoing SOAP request"
140        # get xml elements:
141        body = request('Body', ns=soap_uri, )
142        header = request('Header', ns=soap_uri, )
143        # prepare body xml attributes to be signed (reference)
144        body['wsu:Id'] = "id-14"
145        body['xmlns:wsu'] = WSU_URI
146        # workaround: copy namespaces so lxml can parse the xml to be signed
147        for attr, value in request[:]:
148            if attr.startswith("xmlns"):
149                body[attr] = value
150        # use the internal tag xml representation (not the full xml document)
151        ref_xml = repr(body)
152        # sign using RSA-SHA1 (XML Security)
153        from . import xmlsec
154        vars = xmlsec.rsa_sign(ref_xml, "#id-14",
155                               self.private_key, self.password)
156        vars['certificate'] = self.certificate
157        # generate the xml (filling the placeholders)
158        wsse = SimpleXMLElement(BIN_TOKEN_TMPL % vars)
159        header.import_node(wsse)
160
161    def postprocess(self, client, response, method, args, kwargs, headers, soap_uri):
162        "Verify the signature of the incoming response"
163        from . import xmlsec
164        # get xml elements:
165        body = response('Body', ns=soap_uri, )
166        header = response('Header', ns=soap_uri, )
167        wsse = header("Security", ns=WSSE_URI)
168        cert = wsse("BinarySecurityToken", ns=WSSE_URI)
169        # check that the cert (binary token) is coming in the correct format:
170        self.__check(cert["EncodingType"], Base64Binary_URI)
171        self.__check(cert["ValueType"], X509v3_URI)
172        # extract the certificate (in DER to avoid new line & padding issues!)
173        cert_der = str(cert).decode("base64")
174        public_key = xmlsec.x509_extract_rsa_public_key(cert_der, binary=True)
175        # validate the certificate using the certification authority:
176        if not self.cacert:
177            warnings.warn("No CA provided, WSSE not validating certificate")
178        elif not xmlsec.x509_verify(self.cacert, cert_der, binary=True):
179            raise RuntimeError("WSSE certificate validation failed")
180        # check body xml attributes was signed correctly (reference)
181        self.__check(body['xmlns:wsu'], WSU_URI)
182        ref_uri = body['wsu:Id']
183        signature = wsse("Signature", ns=XMLDSIG_URI)
184        signed_info = signature("SignedInfo", ns=XMLDSIG_URI)
185        signature_value = signature("SignatureValue", ns=XMLDSIG_URI)
186        # TODO: these sanity checks should be moved to xmlsec?
187        self.__check(signed_info("Reference", ns=XMLDSIG_URI)['URI'], "#" + ref_uri)
188        self.__check(signed_info("SignatureMethod", ns=XMLDSIG_URI)['Algorithm'],
189                     XMLDSIG_URI + "rsa-sha1")
190        self.__check(signed_info("Reference", ns=XMLDSIG_URI)("DigestMethod", ns=XMLDSIG_URI)['Algorithm'],
191                     XMLDSIG_URI + "sha1")
192        # TODO: check KeyInfo uses the correct SecurityTokenReference
193        # workaround: copy namespaces so lxml can parse the xml to be signed
194        for attr, value in response[:]:
195            if attr.startswith("xmlns"):
196                body[attr] = value
197        # use the internal tag xml representation (not the full xml document)
198        ref_xml = xmlsec.canonicalize(repr(body))
199        # verify the signed hash
200        computed_hash =  xmlsec.sha1_hash_digest(ref_xml)
201        digest_value = str(signed_info("Reference", ns=XMLDSIG_URI)("DigestValue", ns=XMLDSIG_URI))
202        if computed_hash != digest_value:
203            raise RuntimeError("WSSE SHA1 hash digests mismatch")
204        # workaround: prepare the signed info (assure the parent ns is present)
205        signed_info['xmlns'] = XMLDSIG_URI
206        xml = repr(signed_info)
207        # verify the signature using RSA-SHA1 (XML Security)
208        ok = xmlsec.rsa_verify(xml, str(signature_value), public_key)
209        if not ok:
210            raise RuntimeError("WSSE RSA-SHA1 signature verification failed")
211        # TODO: remove any unsigned part from the xml?
212       
213    def __check(self, value, expected, msg="WSSE sanity check failed"):
214        if value != expected:
215            raise RuntimeError(msg)
Note: See TracBrowser for help on using the repository browser.