# -*- coding: utf-8 -*- # vim: set ts=4 sw=4 et ai: """ | This file is part of the web2py Web Framework | Copyrighted by Massimo Di Pierro | License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) Command line interface ---------------------- The processing of all command line arguments is done using the argparse library in the console function. The basic principle is to process and check for all options in a single place, this place is the parse_args function. Notice that when I say all options I mean really all, options sourced from a configuration file are included. A brief summary of options style follows, for the benefit of code maintainers/developers: - use the underscore to split words in long names (as in '--run_system_tests') - remember to allow the '-' too as word separator (e.g. '--run-system-tests') but do not use this form on help (add the minus version of the option to _omitted_opts to hide it in usage help) - prefer short names on help messages, instead use all options names in warning/error messages (e.g. '-R/--run requires -S/--shell') Notice that options must be included into opt_map dictionary (defined in parse_args function) to be available in configuration file. """ from __future__ import print_function __author__ = 'Paolo Pastori' import os.path import argparse import logging import socket import sys import re import ast from collections import OrderedDict import copy from gluon._compat import PY2 from gluon.shell import die from gluon.utils import is_valid_ip_address from gluon.settings import global_settings def warn(msg): print("%s: warning: %s" % (sys.argv[0], msg), file=sys.stderr) def is_appdir(applications_parent, app): return os.path.isdir(os.path.join(applications_parent, 'applications', app)) def console(version): """ Load command line options. Trivial -h/--help and --version options are also processed. Returns a namespace object (in the sense of argparse) with all options loaded. """ # replacement hints for deprecated options deprecated_opts = { '--debug': '--log_level', '--nogui': '--no_gui', '--ssl_private_key': '--server_key', '--ssl_certificate': '--server_cert', '--interfaces': None, # dest is 'interfaces', hint is '--interface' '-n': '--min_threads', '--numthreads': '--min_threads', '--minthreads': '--min_threads', '--maxthreads': '--max_threads', '-z': None, '--shutdown_timeout': None, '--profiler': '--profiler_dir', '--run-cron': '--with_cron', '--softcron': '--soft_cron', '--cron': '--cron_run', '--test': '--run_doctests' } class HelpFormatter2(argparse.HelpFormatter): """Hides the options listed in _hidden_options in usage help.""" # NOTE: preferred style for long options name is to use '_' # between words (as in 'no_gui'), also accept the '-' in # most of the options but do not show both versions on help _omitted_opts = ('--add-options', '--errors-to-console', '--no-banner', '--log-level', '--no-gui', '--import-models', '--force-migrate', '--server-name', '--server-key', '--server-cert', '--ca-cert', '--pid-filename', '--log-filename', '--min-threads', '--max-threads', '--request-queue-size', '--socket-timeout', '--profiler-dir', '--with-scheduler', '--with-cron', '--cron-threads', '--soft-cron', '--cron-run', '--run-doctests', '--run-system-tests', '--with-coverage') _hidden_options = _omitted_opts + tuple(deprecated_opts.keys()) def _format_action_invocation(self, action): if not action.option_strings: return super(HelpFormatter2, self)._format_action_invocation(action) parts = [] if action.nargs == 0: parts.extend(filter(lambda o : o not in self._hidden_options, action.option_strings)) else: default = action.dest.upper() args_string = self._format_args(action, default) for option_string in action.option_strings: if option_string in self._hidden_options: continue parts.append('%s %s' % (option_string, args_string)) return ', '.join(parts) class ExtendAction(argparse._AppendAction): """Action to accumulate values in a flat list.""" def __call__(self, parser, namespace, values, option_string=None): if isinstance(values, list): # must copy to avoid altering the option default value value = getattr(namespace, self.dest, None) if value is None: value = [] setattr(namespace, self.dest, value) items = value[:] # for options that allows multiple args (i.e. those declared # with add_argument(..., nargs='+', ...)) the values are # always placed into a list while len(values) == 1 and isinstance(values[0], list): values = values[0] items.extend(values) setattr(namespace, self.dest, items) else: super(ExtendAction, self).__call__(parser, namespace, values, option_string) parser = argparse.ArgumentParser( usage='python %(prog)s [options]', description='web2py Web Framework startup script.', epilog='''NOTE: unless a password is specified (-a 'passwd') web2py will attempt to run a GUI to ask for it when starting the web server (if not disabled with --no_gui).''', formatter_class=HelpFormatter2, add_help=False) # do not add -h/--help option # global options g = parser.add_argument_group('global options') g.add_argument('-h', '--help', action='help', help='show this help message and exit') g.add_argument('--version', action='version', version=version, help="show program's version and exit") folder = os.getcwd() g.add_argument('-f', '--folder', default=folder, metavar='WEB2PY_DIR', help='web2py installation directory (%(default)s)') def existing_file(v): if not v: raise argparse.ArgumentTypeError('empty argument') if not os.path.exists(v): raise argparse.ArgumentTypeError("file %r not found" % v) return v g.add_argument('-L', '--config', type=existing_file, metavar='PYTHON_FILE', help='read all options from PYTHON_FILE') g.add_argument('--add_options', '--add-options', default=False, action='store_true', help= 'add options to existing ones, useful with -L only') g.add_argument('-a', '--password', default='', help= 'password to be used for administration (use "" ' 'to reuse the last password), when no password is available ' 'the administrative web interface will be disabled') g.add_argument('-e', '--errors_to_console', '--errors-to-console', default=False, action='store_true', help='log application errors to console') g.add_argument('--no_banner', '--no-banner', default=False, action='store_true', help='do not print header banner') g.add_argument('-Q', '--quiet', default=False, action='store_true', help='disable all output') integer_log_level = [] def log_level(v): # try to convert a lgging level name to its numeric value, # could use logging.getLevelName but not with # 3.4 <= Python < 3.4.2, see # https://docs.python.org/3/library/logging.html#logging.getLevelName) try: name2level = logging._levelNames except AttributeError: # logging._levelNames has gone with Python 3.4, see # https://github.com/python/cpython/commit/3b84eae03ebd8122fdbdced3d85999dd9aedfc7e name2level = logging._nameToLevel try: return name2level[v.upper()] except KeyError: pass try: ill = int(v) # value deprecated: integer in range(101) if 0 <= ill <= 100: integer_log_level.append(ill) return ill except ValueError: pass raise argparse.ArgumentTypeError("bad level %r" % v) g.add_argument('-D', '--log_level', '--log-level', '--debug', # deprecated default='WARNING', type=log_level, metavar='LOG_LEVEL', help= 'set log level, allowed values are: NOTSET, DEBUG, INFO, WARN, ' 'WARNING, ERROR, and CRITICAL, also lowercase (default is ' '%(default)s)') # GUI options g = parser.add_argument_group('GUI options') g.add_argument('--no_gui', '--no-gui', '--nogui', # deprecated default=False, action='store_true', help='do not run GUI') g.add_argument('-t', '--taskbar', default=False, action='store_true', help='run in taskbar (system tray)') # console options g = parser.add_argument_group('console options') g.add_argument('-S', '--shell', metavar='APP_ENV', help= 'run web2py in Python interactive shell or IPython (if installed) ' 'with specified application environment (if application does not ' 'exist it will be created). APP_ENV like a/c/f?x=y (c, f and vars ' 'optional), if APP_ENV include the action f then after the ' 'action execution the interpreter is exited') g.add_argument('-B', '--bpython', default=False, action='store_true', help= 'use bpython (if installed) when running in interactive shell, ' 'see -S above') g.add_argument('-P', '--plain', default=False, action='store_true', help= 'use plain Python shell when running in interactive shell, ' 'see -S above') g.add_argument('-M', '--import_models', '--import-models', default=False, action='store_true', help= 'auto import model files when running in interactive shell ' '(default is %(default)s), see -S above. NOTE: when the APP_ENV ' 'argument of -S include a controller c automatic import of ' 'models is always enabled') g.add_argument('--fake_migrate', default=False, action='store_true', help= 'force DAL to fake migrate all tables; ' 'monkeypatch in the DAL class to force _fake_migrate=True') g.add_argument('--force_migrate', '--force-migrate', default=False, action='store_true', help= 'force DAL to migrate all tables that should be migrated when enabled; ' 'monkeypatch in the DAL class to force _migrate_enabled=True') g.add_argument('-R', '--run', type=existing_file, metavar='PYTHON_FILE', help= 'run PYTHON_FILE in web2py environment; require -S') g.add_argument('-A', '--args', default=[], nargs=argparse.REMAINDER, help= 'use this to pass arguments to the PYTHON_FILE above; require ' '-R. NOTE: must be the last option because eat all remaining ' 'arguments') # web server options g = parser.add_argument_group('web server options') g.add_argument('-s', '--server_name', '--server-name', default=socket.gethostname(), help='web server name (%(default)s)') def ip_addr(v): if not is_valid_ip_address(v): raise argparse.ArgumentTypeError("bad IP address %s" % v) return v g.add_argument('-i', '--ip', default='127.0.0.1', type=ip_addr, metavar='IP_ADDR', help= 'IP address of the server (%(default)s), accept either IPv4 or ' 'IPv6 (e.g. ::1) addresses. NOTE: this option is ignored if ' '--interface is specified') def not_negative_int(v, err_label='value'): try: iv = int(v) if iv < 0: raise ValueError() return iv except ValueError: pass raise argparse.ArgumentTypeError("bad %s %s" % (err_label, v)) def port(v): return not_negative_int(v, err_label='port') g.add_argument('-p', '--port', default=8000, type=port, metavar='NUM', help= 'port of server (%(default)d). ' 'NOTE: this option is ignored if --interface is specified') g.add_argument('-k', '--server_key', '--server-key', '--ssl_private_key', # deprecated type=existing_file, metavar='FILE', help='server private key') g.add_argument('-c', '--server_cert', '--server-cert', '--ssl_certificate', # deprecated type=existing_file, metavar='FILE', help='server certificate') g.add_argument('--ca_cert', '--ca-cert', type=existing_file, metavar='FILE', help='CA certificate') def iface(v, sep=','): if not v: raise argparse.ArgumentTypeError('empty argument') if sep == ':': # deprecated --interfaces ip:port:key:cert:ca_cert # IPv6 addresses in square brackets if v.startswith('['): # IPv6 ip, v_remainder = v.split(']', 1) ip = ip[1:] ifp = v_remainder[1:].split(':') ifp.insert(0, ip) else: # IPv4 ifp = v.split(':') else: # --interface ifp = v.split(sep, 5) if not len(ifp) in (2, 4, 5): raise argparse.ArgumentTypeError("bad interface %r" % v) try: ip_addr(ifp[0]) ifp[1] = port(ifp[1]) for fv in ifp[2:]: existing_file(fv) except argparse.ArgumentTypeError as ex: raise argparse.ArgumentTypeError("bad interface %r (%s)" % (v, ex)) return tuple(ifp) g.add_argument('--interface', dest='interfaces', default=[], action=ExtendAction, type=iface, nargs='+', metavar='IF_INFO', help= 'listen on specified interface, IF_INFO = ' 'IP_ADDR,PORT[,KEY_FILE,CERT_FILE[,CA_CERT_FILE]].' ' NOTE: this option can be used multiple times to provide additional ' 'interfaces to choose from but you can choose which one to listen to ' 'only using the GUI otherwise the first interface specified is used') def ifaces(v): # deprecated --interfaces 'if1;if2;...' if not v: raise argparse.ArgumentTypeError('empty argument') return [iface(i, ':') for i in v.split(';')] g.add_argument('--interfaces', # deprecated default=argparse.SUPPRESS, # do not set if absent action=ExtendAction, type=ifaces, help=argparse.SUPPRESS) # do not show on help g.add_argument('-d', '--pid_filename', '--pid-filename', default='httpserver.pid', metavar='FILE', help='server pid file (%(default)s)') g.add_argument('-l', '--log_filename', '--log-filename', default='httpserver.log', metavar='FILE', help='server log file (%(default)s)') g.add_argument('--min_threads', '--min-threads', '--minthreads', '-n', '--numthreads', # deprecated type=not_negative_int, metavar='NUM', help='minimum number of server threads') g.add_argument('--max_threads', '--max-threads', '--maxthreads', # deprecated type=not_negative_int, metavar='NUM', help='maximum number of server threads') g.add_argument('-q', '--request_queue_size', '--request-queue-size', default=5, type=not_negative_int, metavar='NUM', help= 'max number of queued requests when server busy (%(default)d)') g.add_argument('-o', '--timeout', default=10, type=not_negative_int, metavar='SECONDS', help='timeout for individual request (%(default)d seconds)') g.add_argument('--socket_timeout', '--socket-timeout', default=5, type=not_negative_int, metavar='SECONDS', help='timeout for socket (%(default)d seconds)') g.add_argument('-z', '--shutdown_timeout', # deprecated type=not_negative_int, help=argparse.SUPPRESS) # do not show on help g.add_argument('-F', '--profiler_dir', '--profiler-dir', '--profiler', # deprecated help='profiler directory') # scheduler options g = parser.add_argument_group('scheduler options') g.add_argument('-X', '--with_scheduler', '--with-scheduler', default=False, action='store_true', help= 'run schedulers alongside web server; require --K') def is_app(app): return is_appdir(folder, app) def scheduler(v): if not v: raise argparse.ArgumentTypeError('empty argument') if ',' in v: # legacy "app1,..." vl = [n.strip() for n in v.split(',')] return [scheduler(iv) for iv in vl] vp = [n.strip() for n in v.split(':')] app = vp[0] if not app: raise argparse.ArgumentTypeError('empty application') if not is_app(app): warn("argument -K/--scheduler: bad application %r, skipped" % app) return None return ':'.join(filter(None, vp)) g.add_argument('-K', '--scheduler', dest='schedulers', default=[], action=ExtendAction, type=scheduler, nargs='+', metavar='APP_INFO', help= 'run scheduler for the specified application(s), APP_INFO = ' 'APP_NAME[:GROUPS], that is an optional list of groups can follow ' 'the application name (e.g. app:group1:group2); require a scheduler ' "to be defined in the application's models. NOTE: this option can " 'be used multiple times to add schedulers') # cron options g = parser.add_argument_group('cron options') g.add_argument('-Y', '--with_cron', '--with-cron', '--run-cron', # deprecated default=False, action='store_true', help= 'run cron service alongside web server') def crontab(v): if not v: raise argparse.ArgumentTypeError('empty argument') if not is_app(v): warn("argument --crontab: bad application %r, skipped" % v) return None return v g.add_argument('--crontab', dest='crontabs', default=[], action=ExtendAction, type=crontab, nargs='+', metavar='APP_NAME', help= 'tell cron to read the crontab for the specified application(s) ' 'only, the default behaviour is to read the crontab for all of the ' 'installed applications. NOTE: this option can be used multiple ' 'times to build the list of crontabs to be processed by cron') def positive_int(v, err_label='value'): try: iv = int(v) if iv <= 0: raise ValueError() return iv except ValueError: pass raise argparse.ArgumentTypeError("bad %s %s" % (err_label, v)) def cron_threads(v): return positive_int(v, err_label='cron_threads') g.add_argument('--cron_threads', '--cron-threads', type=cron_threads, metavar='NUM', help='maximum number of cron threads (5)') g.add_argument('--soft_cron', '--soft-cron', '--softcron', # deprecated default=False, action='store_true', help= 'use cron software emulation instead of separate cron process; ' 'require -Y. NOTE: use of cron software emulation is strongly ' 'discouraged') g.add_argument('-C', '--cron_run', '--cron-run', '--cron', # deprecated default=False, action='store_true', help= 'trigger a cron run and exit; usually used when invoked ' 'from a system (external) crontab') g.add_argument('--cron_job', # NOTE: this is intended for internal use only default=False, action='store_true', help=argparse.SUPPRESS) # do not show on help # test options g = parser.add_argument_group('test options') g.add_argument('-v', '--verbose', default=False, action='store_true', help='increase verbosity') g.add_argument('-T', '--run_doctests', '--run-doctests', '--test', # deprecated metavar='APP_ENV', help= 'run doctests in application environment. APP_ENV like a/c/f (c, f ' 'optional)') g.add_argument('--run_system_tests', '--run-system-tests', default=False, action='store_true', help='run web2py test suite') g.add_argument('--with_coverage', '--with-coverage', default=False, action='store_true', help= 'collect coverage data when used with --run_system_tests; ' 'require Python 2.7+ and the coverage module installed') # other options g = parser.add_argument_group('other options') g.add_argument('-G', '--GAE', dest='gae', metavar='APP_NAME', help= 'will create app.yaml and gaehandler.py and exit') options = parse_args(parser, sys.argv[1:], deprecated_opts, integer_log_level) # make a copy of all options for global_settings copy_options = copy.deepcopy(options) copy_options.password = '******' global_settings.cmd_options = copy_options return options REGEX_PEP263 = r'^[ \t\f]*#.*?coding[:=][ \t]*([-_.a-zA-Z0-9]+)' def get_pep263_encoding(source): """ Read python source file encoding, according to PEP 263, see https://www.python.org/dev/peps/pep-0263/ """ with open(source, 'r') as sf: l12 = (sf.readline(), sf.readline()) m12 = re.match(REGEX_PEP263, l12[0]) or re.match(REGEX_PEP263, l12[1]) return m12 and m12.group(1) IGNORE = lambda: None def load_config(config_file, opt_map): """ Load options from config file (a Python script). config_file(str): file name opt_map(dict): mapping fom option name (key) to callable (val), used to post-process parsed value for the option Notice that the configuring Python script is never executed/imported, instead the ast library is used to evaluate each option assignment, provided that it is written on a single line. Returns an OrderedDict with sourced options. """ REGEX_ASSIGN_EXP = re.compile(r'\s*=\s*(.+)') map_items = opt_map.items() # preserve the order of loaded options even though this is not needed pl = OrderedDict() config_encoding = get_pep263_encoding(config_file) # NOTE: assume 'ascii' encoding when not explicitly stated (Python 2), # this is not correct for Python 3 where the default is 'utf-8' open_kwargs = dict() if PY2 else dict(encoding=config_encoding or 'ascii') with open(config_file, 'r', **open_kwargs) as cfil: for linenum, clin in enumerate(cfil, start=1): if PY2 and config_encoding: clin = unicode(clin, config_encoding) clin = clin.strip() for opt, mapr in map_items: if clin.startswith(opt): m = REGEX_ASSIGN_EXP.match(clin[len(opt):]) if m is None: continue try: val = opt_map[opt](ast.literal_eval(m.group(1))) except: die("cannot parse config file %r at line %d" % (config_file, linenum)) if val is not IGNORE: pl[opt] = val return pl def parse_args(parser, cli_args, deprecated_opts, integer_log_level, namespace=None): #print('PARSING ARGS:', cli_args) del integer_log_level[:] options = parser.parse_args(cli_args, namespace) #print('PARSED OPTIONS:', options) # warn for deprecated options deprecated_args = [a for a in cli_args if a in deprecated_opts] for da in deprecated_args: # verify if it was a real option by looking into # parsed values for the actual destination hint = deprecated_opts[da] dest = (hint or da).lstrip('-') default = parser.get_default(dest) if da == '--interfaces': hint = '--interface' if getattr(options, dest) is not default: # the option has been specified msg = "%s is deprecated" % da if hint: msg += ", use %s instead" % hint warn(msg) # warn for deprecated values if integer_log_level and '--debug' not in deprecated_args: warn('integer argument for -D/--log_level is deprecated, ' 'use label instead') # fix schedulers and die if all were skipped if None in options.schedulers: options.schedulers = [i for i in options.schedulers if i is not None] if not options.schedulers: die('no scheduler left') # fix crontabs and die if all were skipped if None in options.crontabs: options.crontabs = [i for i in options.crontabs if i is not None] if not options.crontabs: die('no crontab left') # taskbar if options.taskbar and os.name != 'nt': warn('--taskbar not supported on this platform, skipped') options.taskbar = False # options consistency checkings if options.run and not options.shell: die('-R/--run requires -S/--shell', exit_status=2) if options.args and not options.run: die('-A/--args requires -R/--run', exit_status=2) if options.with_scheduler and not options.schedulers: die('-X/--with_scheduler requires -K/--scheduler', exit_status=2) if options.soft_cron and not options.with_cron: die('--soft_cron requires -Y/--with_cron', exit_status=2) if options.shell: for o, os in dict(with_scheduler='-X/--with_scheduler', schedulers='-K/--scheduler', with_cron='-Y/--with_cron', cron_run='-C/--cron_run', run_doctests='-T/--run_doctests', run_system_tests='--run_system_tests').items(): if getattr(options, o): die("-S/--shell and %s are conflicting options" % os, exit_status=2) if options.bpython and options.plain: die('-B/--bpython and -P/--plain are conflicting options', exit_status=2) if options.cron_run: for o, os in dict(with_cron='-Y/--with_cron', run_doctests='-T/--run_doctests', run_system_tests='--run_system_tests').items(): if getattr(options, o): die("-C/--cron_run and %s are conflicting options" % os, exit_status=2) if options.run_doctests and options.run_system_tests: die('-T/--run_doctests and --run_system_tests are conflicting options', exit_status=2) if options.config: # load options from file, # all options sourced from file that evaluates to False # are skipped, the special IGNORE value is used for this store_true = lambda v: True if v else IGNORE str_or_default = lambda v : str(v) if v else IGNORE list_or_default = lambda v : ( [str(i) for i in v] if isinstance(v, list) else [str(v)]) if v \ else IGNORE # NOTE: 'help', 'version', 'folder', 'cron_job' and 'GAE' are not # sourced from file, the same applies to deprecated options opt_map = { # global options 'config': str_or_default, 'add_options': store_true, 'password': str_or_default, 'errors_to_console': store_true, 'no_banner': store_true, 'quiet': store_true, 'log_level': str_or_default, # GUI options 'no_gui': store_true, 'taskbar': store_true, # console options 'shell': str_or_default, 'bpython': store_true, 'plain': store_true, 'import_models': store_true, 'force_migrate': store_true, 'run': str_or_default, 'args': list_or_default, # web server options 'server_name': str_or_default, 'ip': str_or_default, 'port': str_or_default, 'server_key': str_or_default, 'server_cert': str_or_default, 'ca_cert': str_or_default, 'interface': list_or_default, 'pid_filename': str_or_default, 'log_filename': str_or_default, 'min_threads': str_or_default, 'max_threads': str_or_default, 'request_queue_size': str_or_default, 'timeout': str_or_default, 'socket_timeout': str_or_default, 'profiler_dir': str_or_default, # scheduler options 'with_scheduler': store_true, 'scheduler': list_or_default, # cron options 'with_cron': store_true, 'crontab': list_or_default, 'cron_threads': str_or_default, 'soft_cron': store_true, 'cron_run': store_true, # test options 'verbose': store_true, 'run_doctests': str_or_default, 'run_system_tests': store_true, 'with_coverage': store_true, } od = load_config(options.config, opt_map) #print("LOADED FROM %s:" % options.config, od) # convert loaded options dict as retuned by load_config # into a list of arguments for further parsing by parse_args file_args = []; args_args = [] # '--args' must be the last for key, val in od.items(): if key != 'args': file_args.append('--' + key) if isinstance(val, list): file_args.extend(val) elif not isinstance(val, bool): file_args.append(val) else: args_args = ['--args'] + val file_args += args_args if options.add_options: # add options to existing ones, # must clear config to avoid infinite recursion options.config = options.add_options = None return parse_args(parser, file_args, deprecated_opts, integer_log_level, options) return parse_args(parser, file_args, deprecated_opts, integer_log_level) return options