# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
import pkg_resources
import sys
import optparse
import bool_optparse
import os
import re
import textwrap
import pluginlib
import ConfigParser
import getpass
try:
import subprocess
except ImportError:
from paste.script.util import subprocess24 as subprocess
difflib = None
if sys.version_info >= (2, 6):
from logging.config import fileConfig
else:
# Use our custom fileConfig -- 2.5.1's with a custom Formatter class
# and less strict whitespace (which were incorporated into 2.6's)
from paste.script.util.logging_config import fileConfig
class BadCommand(Exception):
def __init__(self, message, exit_code=2):
self.message = message
self.exit_code = exit_code
Exception.__init__(self, message)
class NoDefault(object):
pass
dist = pkg_resources.get_distribution('PasteScript')
python_version = sys.version.splitlines()[0].strip()
parser = optparse.OptionParser(add_help_option=False,
version='%s from %s (python %s)'
% (dist, dist.location, python_version),
usage='%prog [paster_options] COMMAND [command_options]')
parser.add_option(
'--plugin',
action='append',
dest='plugins',
help="Add a plugin to the list of commands (plugins are Egg specs; will also require() the Egg)")
parser.add_option(
'-h', '--help',
action='store_true',
dest='do_help',
help="Show this help message")
parser.disable_interspersed_args()
# @@: Add an option to run this in another Python interpreter
system_plugins = []
def run(args=None):
if (not args and
len(sys.argv) >= 2
and os.environ.get('_') and sys.argv[0] != os.environ['_']
and os.environ['_'] == sys.argv[1]):
# probably it's an exe execution
args = ['exe', os.environ['_']] + sys.argv[2:]
if args is None:
args = sys.argv[1:]
options, args = parser.parse_args(args)
options.base_parser = parser
system_plugins.extend(options.plugins or [])
commands = get_commands()
if options.do_help:
args = ['help'] + args
if not args:
print 'Usage: %s COMMAND' % sys.argv[0]
args = ['help']
command_name = args[0]
if command_name not in commands:
command = NotFoundCommand
else:
command = commands[command_name].load()
invoke(command, command_name, options, args[1:])
def parse_exe_file(config):
import shlex
p = ConfigParser.RawConfigParser()
p.read([config])
command_name = 'exe'
options = []
if p.has_option('exe', 'command'):
command_name = p.get('exe', 'command')
if p.has_option('exe', 'options'):
options = shlex.split(p.get('exe', 'options'))
if p.has_option('exe', 'sys.path'):
paths = shlex.split(p.get('exe', 'sys.path'))
paths = [os.path.abspath(os.path.join(os.path.dirname(config), p))
for p in paths]
for path in paths:
pkg_resources.working_set.add_entry(path)
sys.path.insert(0, path)
args = [command_name, config] + options
return args
def get_commands():
plugins = system_plugins[:]
egg_info_dir = pluginlib.find_egg_info_dir(os.getcwd())
if egg_info_dir:
plugins.append(os.path.splitext(os.path.basename(egg_info_dir))[0])
base_dir = os.path.dirname(egg_info_dir)
if base_dir not in sys.path:
sys.path.insert(0, base_dir)
pkg_resources.working_set.add_entry(base_dir)
plugins = pluginlib.resolve_plugins(plugins)
commands = pluginlib.load_commands_from_plugins(plugins)
commands.update(pluginlib.load_global_commands())
return commands
def invoke(command, command_name, options, args):
try:
runner = command(command_name)
exit_code = runner.run(args)
except BadCommand, e:
print e.message
exit_code = e.exit_code
sys.exit(exit_code)
class Command(object):
def __init__(self, name):
self.command_name = name
max_args = None
max_args_error = 'You must provide no more than %(max_args)s arguments'
min_args = None
min_args_error = 'You must provide at least %(min_args)s arguments'
required_args = None
# If this command takes a configuration file, set this to 1 or -1
# Then if invoked through #! the config file will be put into the positional
# arguments -- at the beginning with 1, at the end with -1
takes_config_file = None
# Grouped in help messages by this:
group_name = ''
required_args = ()
description = None
usage = ''
hidden = False
# This is the default verbosity level; --quiet subtracts,
# --verbose adds:
default_verbosity = 0
# This is the default interactive state:
default_interactive = 0
return_code = 0
BadCommand = BadCommand
# Must define:
# parser
# summary
# command()
def run(self, args):
self.parse_args(args)
# Setup defaults:
for name, default in [('verbose', 0),
('quiet', 0),
('interactive', False),
('overwrite', False)]:
if not hasattr(self.options, name):
setattr(self.options, name, default)
if getattr(self.options, 'simulate', False):
self.options.verbose = max(self.options.verbose, 1)
self.interactive = self.default_interactive
if getattr(self.options, 'interactive', False):
self.interactive += self.options.interactive
if getattr(self.options, 'no_interactive', False):
self.interactive = False
self.verbose = self.default_verbosity
self.verbose += self.options.verbose
self.verbose -= self.options.quiet
self.simulate = getattr(self.options, 'simulate', False)
# For #! situations:
if (os.environ.get('PASTE_CONFIG_FILE')
and self.takes_config_file is not None):
take = self.takes_config_file
filename = os.environ.get('PASTE_CONFIG_FILE')
if take == 1:
self.args.insert(0, filename)
elif take == -1:
self.args.append(filename)
else:
assert 0, (
"Value takes_config_file must be None, 1, or -1 (not %r)"
% take)
if (os.environ.get('PASTE_DEFAULT_QUIET')):
self.verbose = 0
# Validate:
if self.min_args is not None and len(self.args) < self.min_args:
raise BadCommand(
self.min_args_error % {'min_args': self.min_args,
'actual_args': len(self.args)})
if self.max_args is not None and len(self.args) > self.max_args:
raise BadCommand(
self.max_args_error % {'max_args': self.max_args,
'actual_args': len(self.args)})
for var_name, option_name in self.required_args:
if not getattr(self.options, var_name, None):
raise BadCommand(
'You must provide the option %s' % option_name)
result = self.command()
if result is None:
return self.return_code
else:
return result
def parse_args(self, args):
if self.usage:
usage = ' '+self.usage
else:
usage = ''
self.parser.usage = "%%prog [options]%s\n%s" % (
usage, self.summary)
self.parser.prog = '%s %s' % (sys.argv[0], self.command_name)
if self.description:
desc = self.description
desc = textwrap.dedent(desc)
self.parser.description = desc
self.options, self.args = self.parser.parse_args(args)
########################################
## Utility methods
########################################
def here(cls):
mod = sys.modules[cls.__module__]
return os.path.dirname(mod.__file__)
here = classmethod(here)
def ask(self, prompt, safe=False, default=True):
"""
Prompt the user. Default can be true, false, ``'careful'`` or
``'none'``. If ``'none'`` then the user must enter y/n. If
``'careful'`` then the user must enter yes/no (long form).
If the interactive option is over two (``-ii``) then ``safe``
will be used as a default. This option should be the
do-nothing option.
"""
# @@: Should careful be a separate argument?
if self.options.interactive >= 2:
default = safe
if default == 'careful':
prompt += ' [yes/no]?'
elif default == 'none':
prompt += ' [y/n]?'
elif default:
prompt += ' [Y/n]? '
else:
prompt += ' [y/N]? '
while 1:
response = raw_input(prompt).strip().lower()
if not response:
if default in ('careful', 'none'):
print 'Please enter yes or no'
continue
return default
if default == 'careful':
if response in ('yes', 'no'):
return response == 'yes'
print 'Please enter "yes" or "no"'
continue
if response[0].lower() in ('y', 'n'):
return response[0].lower() == 'y'
print 'Y or N please'
def challenge(self, prompt, default=NoDefault, should_echo=True):
"""
Prompt the user for a variable.
"""
if default is not NoDefault:
prompt += ' [%r]' % default
prompt += ': '
while 1:
if should_echo:
prompt_method = raw_input
else:
prompt_method = getpass.getpass
response = prompt_method(prompt).strip()
if not response:
if default is not NoDefault:
return default
else:
continue
else:
return response
def pad(self, s, length, dir='left'):
if len(s) >= length:
return s
if dir == 'left':
return s + ' '*(length-len(s))
else:
return ' '*(length-len(s)) + s
def standard_parser(cls, verbose=True,
interactive=False,
no_interactive=False,
simulate=False,
quiet=False,
overwrite=False):
"""
Create a standard ``OptionParser`` instance.
Typically used like::
class MyCommand(Command):
parser = Command.standard_parser()
Subclasses may redefine ``standard_parser``, so use the
nearest superclass's class method.
"""
parser = bool_optparse.BoolOptionParser()
if verbose:
parser.add_option('-v', '--verbose',
action='count',
dest='verbose',
default=0)
if quiet:
parser.add_option('-q', '--quiet',
action='count',
dest='quiet',
default=0)
if no_interactive:
parser.add_option('--no-interactive',
action="count",
dest="no_interactive",
default=0)
if interactive:
parser.add_option('-i', '--interactive',
action='count',
dest='interactive',
default=0)
if simulate:
parser.add_option('-n', '--simulate',
action='store_true',
dest='simulate',
default=False)
if overwrite:
parser.add_option('-f', '--overwrite',
dest="overwrite",
action="store_true",
help="Overwrite files (warnings will be emitted for non-matching files otherwise)")
return parser
standard_parser = classmethod(standard_parser)
def shorten(self, fn, *paths):
"""
Return a shorted form of the filename (relative to the current
directory), typically for displaying in messages. If
``*paths`` are present, then use os.path.join to create the
full filename before shortening.
"""
if paths:
fn = os.path.join(fn, *paths)
if fn.startswith(os.getcwd()):
return fn[len(os.getcwd()):].lstrip(os.path.sep)
else:
return fn
def ensure_dir(self, dir, svn_add=True):
"""
Ensure that the directory exists, creating it if necessary.
Respects verbosity and simulation.
Adds directory to subversion if ``.svn/`` directory exists in
parent, and directory was created.
"""
dir = dir.rstrip(os.sep)
if not dir:
# we either reached the parent-most directory, or we got
# a relative directory
# @@: Should we make sure we resolve relative directories
# first? Though presumably the current directory always
# exists.
return
if not os.path.exists(dir):
self.ensure_dir(os.path.dirname(dir))
if self.verbose:
print 'Creating %s' % self.shorten(dir)
if not self.simulate:
os.mkdir(dir)
if (svn_add and
os.path.exists(os.path.join(os.path.dirname(dir), '.svn'))):
self.svn_command('add', dir)
else:
if self.verbose > 1:
print "Directory already exists: %s" % self.shorten(dir)
def ensure_file(self, filename, content, svn_add=True):
"""
Ensure a file named ``filename`` exists with the given
content. If ``--interactive`` has been enabled, this will ask
the user what to do if a file exists with different content.
"""
global difflib
assert content is not None, (
"You cannot pass a content of None")
self.ensure_dir(os.path.dirname(filename), svn_add=svn_add)
if not os.path.exists(filename):
if self.verbose:
print 'Creating %s' % filename
if not self.simulate:
f = open(filename, 'wb')
f.write(content)
f.close()
if svn_add and os.path.exists(os.path.join(os.path.dirname(filename), '.svn')):
self.svn_command('add', filename,
warn_returncode=True)
return
f = open(filename, 'rb')
old_content = f.read()
f.close()
if content == old_content:
if self.verbose > 1:
print 'File %s matches expected content' % filename
return
if not self.options.overwrite:
print 'Warning: file %s does not match expected content' % filename
if difflib is None:
import difflib
diff = difflib.context_diff(
content.splitlines(),
old_content.splitlines(),
'expected ' + filename,
filename)
print '\n'.join(diff)
if self.interactive:
while 1:
s = raw_input(
'Overwrite file with new content? [y/N] ').strip().lower()
if not s:
s = 'n'
if s.startswith('y'):
break
if s.startswith('n'):
return
print 'Unknown response; Y or N please'
else:
return
if self.verbose:
print 'Overwriting %s with new content' % filename
if not self.simulate:
f = open(filename, 'wb')
f.write(content)
f.close()
def insert_into_file(self, filename, marker_name, text,
indent=False):
"""
Inserts ``text`` into the file, right after the given marker.
Markers look like: ``-*-