source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/contrib/webclient.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 7.9 KB
Line 
1"""
2Developed by Massimo Di Pierro
3Released under the web2py license (LGPL)
4
5It an interface on top of urllib2 which simplifies scripting of http requests
6mostly for testing purposes
7
8- customizable
9- supports basic auth
10- supports cookies
11- supports session cookies (tested with web2py sessions)
12- detects broken session
13- detects web2py form postbacks and handles formname and formkey
14- detects web2py tickets
15
16Some examples at the bottom.
17"""
18from __future__ import print_function
19from gluon._compat import urllib2, cookielib, iteritems, to_native, urlencode, to_bytes
20import re
21import time
22
23
24DEFAULT_HEADERS = {
25    'user-agent': 'Mozilla/4.0',  # some servers are picky
26    'accept-language': 'en',
27}
28
29FORM_REGEX = re.compile('(\<input name\="_formkey" type\="hidden" value\="(?P<formkey>.+?)" \/\>)?\<input name\="_formname" type\="hidden" value\="(?P<formname>.+?)" \/\>')
30
31SESSION_REGEX = 'session_id_(?P<name>.+)'
32
33
34class WebClient(object):
35
36    def __init__(self,
37                 app='',
38                 postbacks=True,
39                 default_headers=DEFAULT_HEADERS,
40                 session_regex=SESSION_REGEX):
41        self.app = app
42        self.postbacks = postbacks
43        self.forms = {}
44        self.history = []
45        self.cookies = {}
46        self.cookiejar = cookielib.CookieJar()
47        self.default_headers = default_headers
48        self.sessions = {}
49        self.session_regex = session_regex and re.compile(session_regex)
50        self.headers = {}
51
52    def _parse_headers_in_cookies(self):
53        self.cookies = {}
54        if 'set-cookie' in self.headers:
55            for item in self.headers['set-cookie'].split(','):
56                cookie = item[:item.find(';')]
57                pos = cookie.find('=')
58                key = cookie[:pos]
59                value = cookie[pos+1:]
60                self.cookies[key.strip()] = value.strip()
61
62    def get(self, url, cookies=None, headers=None, auth=None):
63        return self.post(url, data=None, cookies=cookies,
64                         headers=headers, method='GET')
65
66    def post(self, url, data=None, cookies=None,
67             headers=None, auth=None, method='auto', charset='utf-8'):
68        self.url = self.app + url
69
70        # if this POST form requires a postback do it
71        if data and '_formname' in data and self.postbacks and \
72                self.history and self.history[-1][1] != self.url:
73            # to bypass the web2py CSRF need to get formkey
74            # before submitting the form
75            self.get(url, cookies=cookies, headers=headers, auth=auth)
76
77        # unless cookies are specified, recycle cookies
78        if cookies is None:
79            cookies = self.cookies
80        cookies = cookies or {}
81        headers = headers or {}
82
83        args = [
84            urllib2.HTTPCookieProcessor(self.cookiejar),
85            urllib2.HTTPHandler(debuglevel=0)
86            ]
87        # if required do basic auth
88        if auth:
89            auth_handler = urllib2.HTTPBasicAuthHandler()
90            auth_handler.add_password(**auth)
91            args.append(auth_handler)
92
93        opener = urllib2.build_opener(*args)
94
95        # copy headers from dict to list of key,value
96        headers_list = []
97        for key, value in iteritems(self.default_headers):
98            if not key in headers:
99                headers[key] = value
100        for key, value in iteritems(headers):
101            if isinstance(value, (list, tuple)):
102                for v in value:
103                    headers_list.append((key, v))
104            else:
105                headers_list.append((key, value))
106
107        # move cookies to headers
108        for key, value in iteritems(cookies):
109            headers_list.append(('Cookie', '%s=%s' % (key, value)))
110
111        # add headers to request
112        for key, value in headers_list:
113            opener.addheaders.append((key, str(value)))
114
115        # assume everything is ok and make http request
116        error = None
117        try:
118            if isinstance(data, str):
119                self.method = 'POST' if method=='auto' else method
120            elif isinstance(data, dict):
121                self.method = 'POST' if method=='auto' else method
122                # if there is only one form, set _formname automatically
123                if not '_formname' in data and len(self.forms) == 1:
124                    data['_formname'] = next(iter(self.forms.keys())) # Use the first key
125
126                # if there is no formkey but it is known, set it
127                if '_formname' in data and not '_formkey' in data and \
128                        data['_formname'] in self.forms:
129                    data['_formkey'] = self.forms[data['_formname']]
130
131                # time the POST request
132                data = urlencode(data, doseq=True)
133            else:
134                self.method = 'GET' if method=='auto' else method
135                data = None
136            t0 = time.time()
137            self.response = opener.open(self.url, to_bytes(data))
138            self.time = time.time() - t0
139        except urllib2.HTTPError as er:
140            error = er
141            # catch HTTP errors
142            self.time = time.time() - t0
143            self.response = er
144
145        if hasattr(self.response, 'getcode'):
146            self.status = self.response.getcode()
147        else:#python2.5
148            self.status = None
149
150        self.text = self.response.read()
151        if charset:
152            if charset == 'auto':
153                charset = self.response.headers.getparam('charset')
154            self.text = to_native(self.text, charset)
155        # In PY3 self.response.headers are case sensitive
156        self.headers = dict()
157        for h in self.response.headers:
158            self.headers[h.lower()] = self.response.headers[h]
159
160        # treat web2py tickets as special types of errors
161        if error is not None:
162            if 'web2py_error' in self.headers:
163                raise RuntimeError(self.headers['web2py_error'])
164            else:
165                raise error
166
167        self._parse_headers_in_cookies()
168
169        # check is a new session id has been issued, symptom of broken session
170        if self.session_regex is not None:
171            for cookie, value in iteritems(self.cookies):
172                match = self.session_regex.match(cookie)
173                if match:
174                    name = match.group('name')
175                    if name in self.sessions and self.sessions[name] != value:
176                        print(RuntimeError('Changed session ID %s' % name))
177                    self.sessions[name] = value
178
179        # find all forms and formkeys in page
180        if charset:
181            self.forms = {}
182            for match in FORM_REGEX.finditer(self.text):
183                self.forms[match.group('formname')] = match.group('formkey')
184
185        # log this request
186        self.history.append((self.method, self.url, self.status, self.time))
187
188
189def test_web2py_registration_and_login():
190    # from gluon.contrib.webclient import WebClient
191    # start a web2py instance for testing
192
193    client = WebClient('http://127.0.0.1:8000/welcome/default/')
194    client.get('index')
195
196    # register
197    data = dict(first_name='Homer',
198                last_name='Simpson',
199                email='homer@web2py.com',
200                password='test',
201                password_two='test',
202                _formname='register')
203    client.post('user/register', data=data)
204
205    # logout
206    client.get('user/logout')
207
208    # login
209    data = dict(email='homer@web2py.com',
210                password='test',
211                _formname='login')
212    client.post('user/login', data=data)
213
214    # check registration and login were successful
215    client.get('user/profile')
216    assert 'Welcome Homer' in client.text
217
218    # print some variables
219    print('\nsessions:\n', client.sessions)
220    print('\nheaders:\n', client.headers)
221    print('\ncookies:\n', client.cookies)
222    print('\nforms:\n', client.forms)
223    print()
224    for method, url, status, t in client.history:
225        print(method, url, status, t)
226
227if __name__ == '__main__':
228    test_web2py_registration_and_login()
Note: See TracBrowser for help on using the repository browser.