source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/shell.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 15.2 KB
Line 
1# -*- coding: utf-8 -*-
2
3"""
4| This file is part of the web2py Web Framework
5| Developed by Massimo Di Pierro <mdipierro@cs.depaul.edu>,
6| limodou <limodou@gmail.com> and srackham <srackham@gmail.com>.
7| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
8
9Web2py environment in the shell
10--------------------------------
11"""
12
13from __future__ import print_function
14
15import os
16import sys
17import code
18import copy
19import logging
20import types
21import re
22import glob
23import traceback
24import gluon.fileutils as fileutils
25from gluon.settings import global_settings
26from gluon.compileapp import build_environment, read_pyc, run_models_in
27from gluon.restricted import RestrictedError
28from gluon.globals import Request, Response, Session
29from gluon.storage import Storage, List
30from gluon.admin import w2p_unpack
31from pydal.base import BaseAdapter
32from gluon._compat import iteritems, ClassType, PY2
33
34logger = logging.getLogger("web2py")
35
36if not PY2:
37    def execfile(filename, global_vars=None, local_vars=None):
38        with open(filename, "rb") as f:
39            code = compile(f.read(), filename, 'exec')
40            exec(code, global_vars, local_vars)
41    raw_input = input
42
43
44def enable_autocomplete_and_history(adir, env):
45    try:
46        import rlcompleter
47        import atexit
48        import readline
49    except ImportError:
50        pass
51    else:
52        readline.parse_and_bind("tab: complete")
53        history_file = os.path.join(adir, '.pythonhistory')
54        try:
55            readline.read_history_file(history_file)
56        except IOError:
57            open(history_file, 'a').close()
58        atexit.register(readline.write_history_file, history_file)
59        readline.set_completer(rlcompleter.Completer(env).complete)
60
61
62REGEX_APP_PATH = '(?:.*/)?applications/(?P<a>[^/]+)'
63
64def exec_environment(
65    pyfile='',
66    request=None,
67    response=None,
68    session=None,
69):
70    """Environment builder and module loader.
71
72    Builds a web2py environment and optionally executes a Python file into
73    the environment.
74
75    A Storage dictionary containing the resulting environment is returned.
76    The working directory must be web2py root -- this is the web2py default.
77
78    """
79
80    if request is None:
81        request = Request({})
82    if response is None:
83        response = Response()
84    if session is None:
85        session = Session()
86
87    if request.folder is None:
88        mo = re.match(REGEX_APP_PATH, pyfile)
89        if mo:
90            a = mo.group('a')
91            request.folder = os.path.abspath(os.path.join('applications', a))
92        else:
93            request.folder = ''
94    env = build_environment(request, response, session, store_current=False)
95    if pyfile:
96        pycfile = pyfile + 'c'
97        if os.path.isfile(pycfile):
98            exec(read_pyc(pycfile), env)
99        else:
100            execfile(pyfile, env)
101    return Storage(env)
102
103
104def env(
105    a,
106    import_models=False,
107    c=None,
108    f=None,
109    dir='',
110    extra_request={},
111):
112    """
113    Returns web2py execution environment for application (a), controller (c),
114    function (f).
115    If import_models is True the exec all application models into the
116    environment.
117
118    extra_request allows you to pass along any extra variables to the request
119    object before your models get executed. This was mainly done to support
120    web2py_utils.test_runner, however you can use it with any wrapper scripts
121    that need access to the web2py environment.
122    """
123
124    request = Request({})
125    response = Response()
126    session = Session()
127    request.application = a
128
129    # Populate the dummy environment with sensible defaults.
130
131    if not dir:
132        request.folder = os.path.join('applications', a)
133    else:
134        request.folder = dir
135    request.controller = c or 'default'
136    request.function = f or 'index'
137    response.view = '%s/%s.html' % (request.controller,
138                                    request.function)
139    cmd_opts = global_settings.cmd_options
140    if cmd_opts:
141        if not cmd_opts.interfaces:
142            ip = cmd_opts.ip
143            port = cmd_opts.port
144        else:
145            first_if = cmd_opts.interfaces[0]
146            ip = first_if[0]
147            port = first_if[1]
148        request.is_shell = cmd_opts.shell is not None
149    else:
150        ip = '127.0.0.1'; port = 8000
151        request.is_shell = False
152    request.is_scheduler = False
153    request.env.http_host = '%s:%s' % (ip, port)
154    request.env.remote_addr = '127.0.0.1'
155    request.env.web2py_runtime_gae = global_settings.web2py_runtime_gae
156
157    for k, v in extra_request.items():
158        setattr(request, k, v)
159
160    path_info = '/%s/%s/%s' % (a, c, f)
161    if request.args:
162        path_info = '%s/%s' % (path_info, '/'.join(request.args))
163    if request.vars:
164        vars = ['%s=%s' % (k, v) if v else '%s' % k
165                for (k, v) in iteritems(request.vars)]
166        path_info = '%s?%s' % (path_info, '&'.join(vars))
167    request.env.path_info = path_info
168
169    # Monkey patch so credentials checks pass.
170
171    def check_credentials(request, other_application='admin'):
172        return True
173
174    fileutils.check_credentials = check_credentials
175
176    environment = build_environment(request, response, session)
177
178    if import_models:
179        try:
180            run_models_in(environment)
181        except RestrictedError as e:
182            sys.stderr.write(e.traceback + '\n')
183            sys.exit(1)
184
185    response._view_environment = copy.copy(environment)
186
187    environment['__name__'] = '__main__'
188    return environment
189
190
191def exec_pythonrc():
192    pythonrc = os.environ.get('PYTHONSTARTUP')
193    if pythonrc and os.path.isfile(pythonrc):
194        def execfile_getlocals(file):
195            execfile(file)
196            return locals()
197        try:
198            return execfile_getlocals(pythonrc)
199        except NameError:
200            pass
201    return dict()
202
203
204def die(msg, exit_status=1, error_preamble=True):
205    if error_preamble:
206        msg = "%s: error: %s" % (sys.argv[0], msg)
207    print(msg, file=sys.stderr)
208    sys.exit(exit_status)
209
210
211def run(
212    appname,
213    plain=False,
214    import_models=False,
215    startfile=None,
216    bpython=False,
217    python_code=None,
218    cron_job=False,
219    scheduler_job=False,
220    force_migrate=False,
221    fake_migrate=False):
222    """
223    Start interactive shell or run Python script (startfile) in web2py
224    controller environment. appname is formatted like:
225
226    - a : web2py application name
227    - a/c : exec the controller c into the application environment
228    - a/c/f : exec the controller c, then the action f
229              into the application environment
230    - a/c/f?x=y : as above
231    """
232
233    (a, c, f, args, vars) = parse_path_info(appname, av=True)
234    errmsg = 'invalid application name: %s' % appname
235    if not a:
236        die(errmsg, error_preamble=False)
237    adir = os.path.join('applications', a)
238
239    if not os.path.exists(adir):
240        if not cron_job and not scheduler_job and \
241            sys.stdin and not sys.stdin.name == '/dev/null':
242            confirm = raw_input(
243                'application %s does not exist, create (y/n)?' % a)
244        else:
245            logging.warn('application does not exist and will not be created')
246            return
247        if confirm.lower() in ('y', 'yes'):
248            os.mkdir(adir)
249            fileutils.create_app(adir)
250
251    if force_migrate:
252        c = 'appadmin' # Load all models (hack already used for appadmin controller)
253        import_models = True
254        from gluon.dal import DAL
255        orig_init = DAL.__init__
256
257        def custom_init(*args, **kwargs):
258            kwargs['migrate_enabled'] = True
259            kwargs['migrate'] = True
260            kwargs['fake_migrate'] = fake_migrate
261            logger.info('Forcing migrate_enabled=True')
262            orig_init(*args, **kwargs)
263
264        DAL.__init__ = custom_init
265
266    if c:
267        import_models = True
268    extra_request = {}
269    if args:
270        extra_request['args'] = args
271    if scheduler_job:
272        extra_request['is_scheduler'] = True
273    if vars:
274        # underscore necessary because request.vars is a property
275        extra_request['_vars'] = vars
276    _env = env(a, c=c, f=f, import_models=import_models, extra_request=extra_request)
277
278    if c:
279        pyfile = os.path.join('applications', a, 'controllers', c + '.py')
280        pycfile = os.path.join('applications', a, 'compiled',
281                                 "controllers.%s.%s.pyc" % (c, f))
282        if ((cron_job and os.path.isfile(pycfile))
283            or not os.path.isfile(pyfile)):
284            exec(read_pyc(pycfile), _env)
285        elif os.path.isfile(pyfile):
286            execfile(pyfile, _env)
287        else:
288            die(errmsg, error_preamble=False)
289
290    if f:
291        exec('print( %s())' % f, _env)
292        return
293
294    _env.update(exec_pythonrc())
295    if startfile:
296        try:
297            ccode = None
298            if startfile.endswith('.pyc'):
299                ccode = read_pyc(startfile)
300                exec(ccode, _env)
301            else:
302                execfile(startfile, _env)
303
304            if import_models:
305                BaseAdapter.close_all_instances('commit')
306        except SystemExit:
307            print(traceback.format_exc())
308            if import_models:
309                BaseAdapter.close_all_instances('rollback')
310            raise
311        except:
312            print(traceback.format_exc())
313            if import_models:
314                BaseAdapter.close_all_instances('rollback')
315    elif python_code:
316        try:
317            exec(python_code, _env)
318            if import_models:
319                BaseAdapter.close_all_instances('commit')
320        except SystemExit:
321            print(traceback.format_exc())
322            if import_models:
323                BaseAdapter.close_all_instances('rollback')
324            raise
325        except:
326            print(traceback.format_exc())
327            if import_models:
328                BaseAdapter.close_all_instances('rollback')
329    elif force_migrate:
330        try:
331            execfile("scripts/migrator.py", _env)
332            if import_models:
333                BaseAdapter.close_all_instances('commit')
334        except SystemExit:
335            print(traceback.format_exc())
336            if import_models:
337                BaseAdapter.close_all_instances('rollback')
338            raise
339        except:
340            print(traceback.format_exc())
341            if import_models:
342                BaseAdapter.close_all_instances('rollback')
343    else:
344        if not plain:
345            if bpython:
346                try:
347                    import bpython
348                    bpython.embed(locals_=_env)
349                    return
350                except:
351                    logger.warning(
352                        'import bpython error; trying ipython...')
353            else:
354                try:
355                    import IPython
356                    if IPython.__version__ > '1.0.0':
357                        IPython.start_ipython(user_ns=_env)
358                        return
359                    elif IPython.__version__ == '1.0.0':
360                        from IPython.terminal.embed import InteractiveShellEmbed
361                        shell = InteractiveShellEmbed(user_ns=_env)
362                        shell()
363                        return
364                    elif IPython.__version__ >= '0.11':
365                        from IPython.frontend.terminal.embed import InteractiveShellEmbed
366                        shell = InteractiveShellEmbed(user_ns=_env)
367                        shell()
368                        return
369                    else:
370                        # following 2 lines fix a problem with
371                        # IPython; thanks Michael Toomim
372                        if '__builtins__' in _env:
373                            del _env['__builtins__']
374                        shell = IPython.Shell.IPShell(argv=[], user_ns=_env)
375                        shell.mainloop()
376                        return
377                except:
378                    logger.warning(
379                        'import IPython error; use default python shell')
380        enable_autocomplete_and_history(adir, _env)
381        code.interact(local=_env)
382
383
384def parse_path_info(path_info, av=False):
385    """
386    Parses path info formatted like a/c/f where c and f are optional
387    and a leading `/` is accepted.
388    Return tuple (a, c, f). If invalid path_info a is set to None.
389    If c or f are omitted they are set to None.
390    If av=True, parse args and vars
391    """
392    if av:
393        vars = None
394        if '?' in path_info:
395            path_info, query = path_info.split('?', 2)
396            vars = Storage()
397            for var in query.split('&'):
398                (var, val) = var.split('=', 2) if '=' in var else (var, None)
399                vars[var] = val
400        items = List(path_info.split('/'))
401        args = List(items[3:]) if len(items) > 3 else None
402        return (items(0), items(1), items(2), args, vars)
403
404    mo = re.match(r'^/?(?P<a>\w+)(/(?P<c>\w+)(/(?P<f>\w+))?)?$',
405                  path_info)
406    if mo:
407        return (mo.group('a'), mo.group('c'), mo.group('f'))
408    else:
409        return (None, None, None)
410
411
412def test(testpath, import_models=True, verbose=False):
413    """
414    Run doctests in web2py environment. testpath is formatted like:
415
416    - a: tests all controllers in application a
417    - a/c: tests controller c in application a
418    - a/c/f  test function f in controller c, application a
419
420    Where a, c and f are application, controller and function names
421    respectively. If the testpath is a file name the file is tested.
422    If a controller is specified models are executed by default.
423    """
424
425    import doctest
426    if os.path.isfile(testpath):
427        mo = re.match(REGEX_APP_PATH, testpath)
428        if not mo:
429            die('test file is not in application directory: %s'
430                % testpath)
431        a = mo.group('a')
432        c = f = None
433        files = [testpath]
434    else:
435        (a, c, f) = parse_path_info(testpath)
436        errmsg = 'invalid test path: %s' % testpath
437        if not a:
438            die(errmsg)
439        cdir = os.path.join('applications', a, 'controllers')
440        if not os.path.isdir(cdir):
441            die(errmsg)
442        if c:
443            cfile = os.path.join(cdir, c + '.py')
444            if not os.path.isfile(cfile):
445                die(errmsg)
446            files = [cfile]
447        else:
448            files = glob.glob(os.path.join(cdir, '*.py'))
449    for testfile in files:
450        globs = env(a, import_models)
451        ignores = globs.keys()
452        execfile(testfile, globs)
453
454        def doctest_object(name, obj):
455            """doctest obj and enclosed methods and classes."""
456
457            if type(obj) in (types.FunctionType, type, ClassType, types.MethodType,
458                             types.UnboundMethodType):
459
460                # Reload environment before each test.
461
462                globs = env(a, c=c, f=f, import_models=import_models)
463                execfile(testfile, globs)
464                doctest.run_docstring_examples(
465                    obj, globs=globs,
466                    name='%s: %s' % (os.path.basename(testfile),
467                                     name), verbose=verbose)
468                if type(obj) in (type, ClassType):
469                    for attr_name in dir(obj):
470
471                        # Execute . operator so decorators are executed.
472
473                        o = eval('%s.%s' % (name, attr_name), globs)
474                        doctest_object(attr_name, o)
475
476        for (name, obj) in globs.items():
477            if name not in ignores and (f is None or f == name):
478                doctest_object(name, obj)
Note: See TracBrowser for help on using the repository browser.