1 | #!/usr/bin/python |
---|
2 | # -*- coding: utf-8 -*- |
---|
3 | # This program is free software; you can redistribute it and/or modify |
---|
4 | # it under the terms of the GNU Lesser General Public License as published by |
---|
5 | # the Free Software Foundation; either version 3, or (at your option) any |
---|
6 | # later version. |
---|
7 | # |
---|
8 | # This program is distributed in the hope that it will be useful, but |
---|
9 | # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY |
---|
10 | # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
---|
11 | # for more details. |
---|
12 | |
---|
13 | """Pythonic simple SOAP Client plugins for WebService Security extensions""" |
---|
14 | |
---|
15 | |
---|
16 | from __future__ import unicode_literals |
---|
17 | import sys |
---|
18 | if sys.version > '3': |
---|
19 | basestring = unicode = str |
---|
20 | |
---|
21 | import datetime |
---|
22 | from decimal import Decimal |
---|
23 | import os |
---|
24 | import logging |
---|
25 | import hashlib |
---|
26 | import warnings |
---|
27 | |
---|
28 | from . import __author__, __copyright__, __license__, __version__ |
---|
29 | from .simplexml import SimpleXMLElement |
---|
30 | |
---|
31 | import random |
---|
32 | import string |
---|
33 | from hashlib import sha1 |
---|
34 | |
---|
35 | def randombytes(N): |
---|
36 | return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(N)) |
---|
37 | |
---|
38 | # Namespaces: |
---|
39 | |
---|
40 | WSSE_URI = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd' |
---|
41 | WSU_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd" |
---|
42 | XMLDSIG_URI = "http://www.w3.org/2000/09/xmldsig#" |
---|
43 | X509v3_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3" |
---|
44 | Base64Binary_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary" |
---|
45 | PasswordDigest_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest" |
---|
46 | |
---|
47 | |
---|
48 | class UsernameToken: |
---|
49 | "WebService Security extension to add a basic credentials to xml request" |
---|
50 | |
---|
51 | def __init__(self, username="", password=""): |
---|
52 | self.token = { |
---|
53 | 'wsse:UsernameToken': { |
---|
54 | 'wsse:Username': username, |
---|
55 | 'wsse:Password': password, |
---|
56 | } |
---|
57 | } |
---|
58 | |
---|
59 | def preprocess(self, client, request, method, args, kwargs, headers, soap_uri): |
---|
60 | "Add basic credentials to outgoing message" |
---|
61 | # always extract WS Security header and send it |
---|
62 | header = request('Header', ns=soap_uri, ) |
---|
63 | k = 'wsse:Security' |
---|
64 | # for backward compatibility, use header if given: |
---|
65 | if k in headers: |
---|
66 | self.token = headers[k] |
---|
67 | # convert the token to xml |
---|
68 | header.marshall(k, self.token, ns=False, add_children_ns=False) |
---|
69 | header(k)['xmlns:wsse'] = WSSE_URI |
---|
70 | #<wsse:UsernameToken xmlns:wsu='http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd'> |
---|
71 | |
---|
72 | def postprocess(self, client, response, method, args, kwargs, headers, soap_uri): |
---|
73 | "Analyze incoming credentials" |
---|
74 | # TODO: add some password validation callback? |
---|
75 | pass |
---|
76 | |
---|
77 | class UsernameDigestToken(UsernameToken): |
---|
78 | """ |
---|
79 | WebService Security extension to add a http digest credentials to xml request |
---|
80 | drift -> time difference from the server in seconds, needed for 'Created' header |
---|
81 | """ |
---|
82 | |
---|
83 | def __init__(self, username="", password="", drift=0): |
---|
84 | self.username = username |
---|
85 | self.password = password |
---|
86 | self.drift = datetime.timedelta(seconds=drift) |
---|
87 | |
---|
88 | def preprocess(self, client, request, method, args, kwargs, headers, soap_uri): |
---|
89 | header = request('Header', ns=soap_uri, ) |
---|
90 | wsse = header.add_child('wsse:Security', ns=False) |
---|
91 | wsse['xmlns:wsse'] = WSSE_URI |
---|
92 | wsse['xmlns:wsu'] = WSU_URI |
---|
93 | |
---|
94 | usertoken = wsse.add_child('wsse:UsernameToken', ns=False) |
---|
95 | usertoken.add_child('wsse:Username', self.username, ns=False) |
---|
96 | |
---|
97 | created = (datetime.datetime.utcnow() + self.drift).isoformat() + 'Z' |
---|
98 | usertoken.add_child('wsu:Created', created, ns=False) |
---|
99 | |
---|
100 | nonce = randombytes(16) |
---|
101 | wssenonce = usertoken.add_child('wsse:Nonce', nonce.encode('base64')[:-1], ns=False) |
---|
102 | wssenonce['EncodingType'] = Base64Binary_URI |
---|
103 | |
---|
104 | sha1obj = sha1() |
---|
105 | sha1obj.update(nonce + created + self.password) |
---|
106 | digest = sha1obj.digest() |
---|
107 | password = usertoken.add_child('wsse:Password', digest.encode('base64')[:-1], ns=False) |
---|
108 | password['Type'] = PasswordDigest_URI |
---|
109 | |
---|
110 | |
---|
111 | BIN_TOKEN_TMPL = """<?xml version="1.0" encoding="UTF-8"?> |
---|
112 | <wsse:Security soapenv:mustUnderstand="1" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"> |
---|
113 | <wsse:BinarySecurityToken EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3" wsu:Id="CertId-45851B081998E431E8132880700036719" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"> |
---|
114 | %(certificate)s</wsse:BinarySecurityToken> |
---|
115 | <ds:Signature Id="Signature-13" xmlns:ds="http://www.w3.org/2000/09/xmldsig#"> |
---|
116 | %(signed_info)s |
---|
117 | <ds:SignatureValue>%(signature_value)s</ds:SignatureValue> |
---|
118 | <ds:KeyInfo Id="KeyId-45851B081998E431E8132880700036720"> |
---|
119 | <wsse:SecurityTokenReference wsu:Id="STRId-45851B081998E431E8132880700036821" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"> |
---|
120 | <wsse:Reference URI="#CertId-45851B081998E431E8132880700036719" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"/> |
---|
121 | </wsse:SecurityTokenReference> |
---|
122 | </ds:KeyInfo> |
---|
123 | </ds:Signature> |
---|
124 | </wsse:Security> |
---|
125 | """ |
---|
126 | |
---|
127 | class BinaryTokenSignature: |
---|
128 | "WebService Security extension to add a basic signature to xml request" |
---|
129 | |
---|
130 | def __init__(self, certificate="", private_key="", password=None, cacert=None): |
---|
131 | # read the X509v3 certificate (PEM) |
---|
132 | self.certificate = ''.join([line for line in open(certificate) |
---|
133 | if not line.startswith("---")]) |
---|
134 | self.private_key = private_key |
---|
135 | self.password = password |
---|
136 | self.cacert = cacert |
---|
137 | |
---|
138 | def preprocess(self, client, request, method, args, kwargs, headers, soap_uri): |
---|
139 | "Sign the outgoing SOAP request" |
---|
140 | # get xml elements: |
---|
141 | body = request('Body', ns=soap_uri, ) |
---|
142 | header = request('Header', ns=soap_uri, ) |
---|
143 | # prepare body xml attributes to be signed (reference) |
---|
144 | body['wsu:Id'] = "id-14" |
---|
145 | body['xmlns:wsu'] = WSU_URI |
---|
146 | # workaround: copy namespaces so lxml can parse the xml to be signed |
---|
147 | for attr, value in request[:]: |
---|
148 | if attr.startswith("xmlns"): |
---|
149 | body[attr] = value |
---|
150 | # use the internal tag xml representation (not the full xml document) |
---|
151 | ref_xml = repr(body) |
---|
152 | # sign using RSA-SHA1 (XML Security) |
---|
153 | from . import xmlsec |
---|
154 | vars = xmlsec.rsa_sign(ref_xml, "#id-14", |
---|
155 | self.private_key, self.password) |
---|
156 | vars['certificate'] = self.certificate |
---|
157 | # generate the xml (filling the placeholders) |
---|
158 | wsse = SimpleXMLElement(BIN_TOKEN_TMPL % vars) |
---|
159 | header.import_node(wsse) |
---|
160 | |
---|
161 | def postprocess(self, client, response, method, args, kwargs, headers, soap_uri): |
---|
162 | "Verify the signature of the incoming response" |
---|
163 | from . import xmlsec |
---|
164 | # get xml elements: |
---|
165 | body = response('Body', ns=soap_uri, ) |
---|
166 | header = response('Header', ns=soap_uri, ) |
---|
167 | wsse = header("Security", ns=WSSE_URI) |
---|
168 | cert = wsse("BinarySecurityToken", ns=WSSE_URI) |
---|
169 | # check that the cert (binary token) is coming in the correct format: |
---|
170 | self.__check(cert["EncodingType"], Base64Binary_URI) |
---|
171 | self.__check(cert["ValueType"], X509v3_URI) |
---|
172 | # extract the certificate (in DER to avoid new line & padding issues!) |
---|
173 | cert_der = str(cert).decode("base64") |
---|
174 | public_key = xmlsec.x509_extract_rsa_public_key(cert_der, binary=True) |
---|
175 | # validate the certificate using the certification authority: |
---|
176 | if not self.cacert: |
---|
177 | warnings.warn("No CA provided, WSSE not validating certificate") |
---|
178 | elif not xmlsec.x509_verify(self.cacert, cert_der, binary=True): |
---|
179 | raise RuntimeError("WSSE certificate validation failed") |
---|
180 | # check body xml attributes was signed correctly (reference) |
---|
181 | self.__check(body['xmlns:wsu'], WSU_URI) |
---|
182 | ref_uri = body['wsu:Id'] |
---|
183 | signature = wsse("Signature", ns=XMLDSIG_URI) |
---|
184 | signed_info = signature("SignedInfo", ns=XMLDSIG_URI) |
---|
185 | signature_value = signature("SignatureValue", ns=XMLDSIG_URI) |
---|
186 | # TODO: these sanity checks should be moved to xmlsec? |
---|
187 | self.__check(signed_info("Reference", ns=XMLDSIG_URI)['URI'], "#" + ref_uri) |
---|
188 | self.__check(signed_info("SignatureMethod", ns=XMLDSIG_URI)['Algorithm'], |
---|
189 | XMLDSIG_URI + "rsa-sha1") |
---|
190 | self.__check(signed_info("Reference", ns=XMLDSIG_URI)("DigestMethod", ns=XMLDSIG_URI)['Algorithm'], |
---|
191 | XMLDSIG_URI + "sha1") |
---|
192 | # TODO: check KeyInfo uses the correct SecurityTokenReference |
---|
193 | # workaround: copy namespaces so lxml can parse the xml to be signed |
---|
194 | for attr, value in response[:]: |
---|
195 | if attr.startswith("xmlns"): |
---|
196 | body[attr] = value |
---|
197 | # use the internal tag xml representation (not the full xml document) |
---|
198 | ref_xml = xmlsec.canonicalize(repr(body)) |
---|
199 | # verify the signed hash |
---|
200 | computed_hash = xmlsec.sha1_hash_digest(ref_xml) |
---|
201 | digest_value = str(signed_info("Reference", ns=XMLDSIG_URI)("DigestValue", ns=XMLDSIG_URI)) |
---|
202 | if computed_hash != digest_value: |
---|
203 | raise RuntimeError("WSSE SHA1 hash digests mismatch") |
---|
204 | # workaround: prepare the signed info (assure the parent ns is present) |
---|
205 | signed_info['xmlns'] = XMLDSIG_URI |
---|
206 | xml = repr(signed_info) |
---|
207 | # verify the signature using RSA-SHA1 (XML Security) |
---|
208 | ok = xmlsec.rsa_verify(xml, str(signature_value), public_key) |
---|
209 | if not ok: |
---|
210 | raise RuntimeError("WSSE RSA-SHA1 signature verification failed") |
---|
211 | # TODO: remove any unsigned part from the xml? |
---|
212 | |
---|
213 | def __check(self, value, expected, msg="WSSE sanity check failed"): |
---|
214 | if value != expected: |
---|
215 | raise RuntimeError(msg) |
---|