import logging
from StringIO import StringIO
import sys
from repoze.who.interfaces import IIdentifier
from repoze.who.interfaces import IAuthenticator
from repoze.who.interfaces import IChallenger
from repoze.who.interfaces import IMetadataProvider
_STARTED = '-- repoze.who request started (%s) --'
_ENDED = '-- repoze.who request ended (%s) --'
class PluggableAuthenticationMiddleware(object):
def __init__(self, app,
identifiers,
authenticators,
challengers,
mdproviders,
classifier,
challenge_decider,
log_stream = None,
log_level = logging.INFO,
remote_user_key = 'REMOTE_USER',
):
iregistry, nregistry = make_registries(identifiers, authenticators,
challengers, mdproviders)
self.registry = iregistry
self.name_registry = nregistry
self.app = app
self.classifier = classifier
self.challenge_decider = challenge_decider
self.remote_user_key = remote_user_key
self.logger = None
if isinstance(log_stream, logging.Logger):
self.logger = log_stream
elif log_stream:
handler = logging.StreamHandler(log_stream)
fmt = '%(asctime)s %(message)s'
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
self.logger = logging.Logger('repoze.who')
self.logger.addHandler(handler)
self.logger.setLevel(log_level)
def __call__(self, environ, start_response):
if self.remote_user_key in environ:
# act as a pass through if REMOTE_USER (or whatever) is
# already set
return self.app(environ, start_response)
path_info = environ.get('PATH_INFO', None)
environ['repoze.who.plugins'] = self.name_registry
environ['repoze.who.logger'] = self.logger
environ['repoze.who.application'] = self.app
logger = self.logger
logger and logger.info(_STARTED % path_info)
classification = self.classifier(environ)
logger and logger.info('request classification: %s' % classification)
userid = None
identity = None
identifier = None
ids = self.identify(environ, classification)
# ids will be list of tuples: [ (IIdentifier, identity) ]
if ids:
auth_ids = self.authenticate(environ, classification, ids)
# auth_ids will be a list of five-tuples in the form
# ( (auth_rank, id_rank), authenticator, identifier, identity,
# userid )
#
# When sorted, its first element will represent the "best"
# identity for this request.
if auth_ids:
auth_ids.sort()
best = auth_ids[0]
rank, authenticator, identifier, identity, userid = best
identity = Identity(identity) # dont show contents at print
# allow IMetadataProvider plugins to scribble on the identity
self.add_metadata(environ, classification, identity)
# add the identity to the environment; a downstream
# application can mutate it to do an 'identity reset'
# as necessary, e.g. identity['login'] = 'foo',
# identity['password'] = 'bar'
environ['repoze.who.identity'] = identity
# set the REMOTE_USER
environ[self.remote_user_key] = userid
else:
logger and logger.info('no identities found, not authenticating')
# allow identifier plugins to replace the downstream
# application (to do redirection and unauthorized themselves
# mostly)
app = environ.pop('repoze.who.application')
if app is not self.app:
logger and logger.info(
'static downstream application replaced with %s' % app)
wrapper = StartResponseWrapper(start_response)
app_iter = app(environ, wrapper.wrap_start_response)
# The challenge decider almost(?) always needs information from the
# response. The WSGI spec (PEP 333) states that a WSGI application
# must call start_response by the iterable's first iteration. If
# start_response hasn't been called, we'll wrap it in a way that
# triggers that call.
if not wrapper.called:
app_iter = wrap_generator(app_iter)
if self.challenge_decider(environ, wrapper.status, wrapper.headers):
logger and logger.info('challenge required')
challenge_app = self.challenge(
environ,
classification,
wrapper.status,
wrapper.headers,
identifier,
identity
)
if challenge_app is not None:
logger and logger.info('executing challenge app')
if app_iter:
list(app_iter) # unwind the original app iterator
# replace the downstream app with the challenge app
app_iter = challenge_app(environ, start_response)
else:
logger and logger.info('configuration error: no challengers')
raise RuntimeError('no challengers found')
else:
logger and logger.info('no challenge required')
remember_headers = []
if identifier:
remember_headers = identifier.remember(environ, identity)
if remember_headers:
logger and logger.info('remembering via headers from %s: %s'
% (identifier, remember_headers))
wrapper.finish_response(remember_headers)
logger and logger.info(_ENDED % path_info)
return app_iter
def identify(self, environ, classification):
logger = self.logger
candidates = self.registry.get(IIdentifier, ())
logger and self.logger.info('identifier plugins registered %s' %
(candidates,))
plugins = match_classification(IIdentifier, candidates, classification)
logger and self.logger.info(
'identifier plugins matched for '
'classification "%s": %s' % (classification, plugins))
results = []
for plugin in plugins:
identity = plugin.identify(environ)
if identity is not None:
logger and logger.debug(
'identity returned from %s: %s' % (plugin, identity))
results.append((plugin, identity))
else:
logger and logger.debug(
'no identity returned from %s (%s)' % (plugin, identity))
logger and logger.debug('identities found: %s' % (results,))
return results
def add_metadata(self, environ, classification, identity):
candidates = self.registry.get(IMetadataProvider, ())
plugins = match_classification(IMetadataProvider, candidates,
classification)
for plugin in plugins:
plugin.add_metadata(environ, identity)
def authenticate(self, environ, classification, identities):
logger = self.logger
candidates = self.registry.get(IAuthenticator, [])
logger and self.logger.info('authenticator plugins registered %s' %
candidates)
plugins = match_classification(IAuthenticator, candidates,
classification)
logger and self.logger.info(
'authenticator plugins matched for '
'classification "%s": %s' % (classification, plugins))
# 'preauthenticated' identities are considered best-ranking
identities, results, id_rank_start =self._filter_preauthenticated(
identities)
auth_rank = 0
for plugin in plugins:
identifier_rank = id_rank_start
for identifier, identity in identities:
userid = plugin.authenticate(environ, identity)
if userid is not None:
logger and logger.debug(
'userid returned from %s: "%s"' % (plugin, userid))
# stamp the identity with the userid
identity['repoze.who.userid'] = userid
rank = (auth_rank, identifier_rank)
results.append(
(rank, plugin, identifier, identity, userid)
)
else:
logger and logger.debug(
'no userid returned from %s: (%s)' % (
plugin, userid))
identifier_rank += 1
auth_rank += 1
logger and logger.debug('identities authenticated: %s' % (results,))
return results
def _filter_preauthenticated(self, identities):
logger = self.logger
results = []
new_identities = identities[:]
identifier_rank = 0
for thing in identities:
identifier, identity = thing
userid = identity.get('repoze.who.userid')
if userid is not None:
# the identifier plugin has already authenticated this
# user (domain auth, auth ticket, etc)
logger and logger.info(
'userid preauthenticated by %s: "%s" '
'(repoze.who.userid set)' % (identifier, userid)
)
rank = (0, identifier_rank)
results.append(
(rank, None, identifier, identity, userid)
)
identifier_rank += 1
new_identities.remove(thing)
return new_identities, results, identifier_rank
def challenge(self, environ, classification, status, app_headers,
identifier, identity):
# happens on egress
logger = self.logger
forget_headers = []
if identifier:
forget_headers = identifier.forget(environ, identity)
if forget_headers is None:
forget_headers = []
else:
logger and logger.info('forgetting via headers from %s: %s'
% (identifier, forget_headers))
candidates = self.registry.get(IChallenger, ())
logger and logger.info('challengers registered: %s' % candidates)
plugins = match_classification(IChallenger,
candidates, classification)
logger and logger.info('challengers matched for '
'classification "%s": %s' % (classification,
plugins))
for plugin in plugins:
app = plugin.challenge(environ, status, app_headers,
forget_headers)
if app is not None:
# new WSGI application
logger and logger.info(
'challenger plugin %s "challenge" returned an app' % (
plugin))
return app
# signifies no challenge
logger and logger.info('no challenge app returned')
return None
def wrap_generator(result):
"""\
This function returns a generator that behaves exactly the same as the
original. It's only difference is it pulls the first iteration off and
caches it to trigger any immediate side effects (in a WSGI world, this
ensures start_response is called).
"""
# Neat trick to pull the first iteration only. We need to do this outside
# of the generator function to ensure it is called.
for iter in result:
first = iter
break
# Wrapper yields the first iteration, then passes result's iterations
# directly up.
def wrapper():
yield first
for iter in result:
# We'll let result's StopIteration bubble up directly.
yield iter
return wrapper()
def match_classification(iface, plugins, classification):
result = []
for plugin in plugins:
plugin_classifications = getattr(plugin, 'classifications', {})
iface_classifications = plugin_classifications.get(iface)
if not iface_classifications: # good for any
result.append(plugin)
continue
if classification in iface_classifications:
result.append(plugin)
return result
class StartResponseWrapper(object):
def __init__(self, start_response):
self.start_response = start_response
self.status = None
self.headers = []
self.exc_info = None
self.buffer = StringIO()
# A WSGI app may delay calling start_response until the first iteration
# of its generator. We track this so we know whether or not we need to
# trigger an iteration before examining the response.
self.called = False
def wrap_start_response(self, status, headers, exc_info=None):
self.headers = headers
self.status = status
self.exc_info = exc_info
# The response has been initiated, so we have a valid code.
self.called = True
return self.buffer.write
def finish_response(self, extra_headers):
if not extra_headers:
extra_headers = []
headers = self.headers + extra_headers
write = self.start_response(self.status, headers, self.exc_info)
if write:
self.buffer.seek(0)
value = self.buffer.getvalue()
if value:
write(value)
if hasattr(write, 'close'):
write.close()
def make_test_middleware(app, global_conf):
""" Functionally equivalent to
[plugin:form]
use = repoze.who.plugins.form.FormPlugin
rememberer_name = cookie
login_form_qs=__do_login
[plugin:cookie]
use = repoze.who.plugins.cookie:InsecureCookiePlugin
cookie_name = oatmeal
[plugin:basicauth]
use = repoze.who.plugins.basicauth.BasicAuthPlugin
realm = repoze.who
[plugin:htpasswd]
use = repoze.who.plugins.htpasswd.HTPasswdPlugin
filename = <...>
check_fn = repoze.who.plugins.htpasswd:crypt_check
[general]
request_classifier = repoze.who.classifiers:default_request_classifier
challenge_decider = repoze.who.classifiers:default_challenge_decider
[identifiers]
plugins = form:browser cookie basicauth
[authenticators]
plugins = htpasswd
[challengers]
plugins = form:browser basicauth
"""
# be able to test without a config file
from repoze.who.plugins.basicauth import BasicAuthPlugin
from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin
from repoze.who.plugins.cookie import InsecureCookiePlugin
from repoze.who.plugins.form import FormPlugin
from repoze.who.plugins.htpasswd import HTPasswdPlugin
io = StringIO()
salt = 'aa'
for name, password in [ ('admin', 'admin'), ('chris', 'chris') ]:
io.write('%s:%s\n' % (name, password))
io.seek(0)
def cleartext_check(password, hashed):
return password == hashed #pragma NO COVERAGE
htpasswd = HTPasswdPlugin(io, cleartext_check)
basicauth = BasicAuthPlugin('repoze.who')
auth_tkt = AuthTktCookiePlugin('secret', 'auth_tkt')
cookie = InsecureCookiePlugin('oatmeal')
form = FormPlugin('__do_login', rememberer_name='auth_tkt')
form.classifications = { IIdentifier:['browser'],
IChallenger:['browser'] } # only for browser
identifiers = [('form', form),('auth_tkt',auth_tkt),('basicauth',basicauth)]
authenticators = [('htpasswd', htpasswd)]
challengers = [('form',form), ('basicauth',basicauth)]
mdproviders = []
from repoze.who.classifiers import default_request_classifier
from repoze.who.classifiers import default_challenge_decider
log_stream = None
import os
if os.environ.get('WHO_LOG'):
log_stream = sys.stdout
middleware = PluggableAuthenticationMiddleware(
app,
identifiers,
authenticators,
challengers,
mdproviders,
default_request_classifier,
default_challenge_decider,
log_stream = log_stream,
log_level = logging.DEBUG
)
return middleware
def verify(plugin, iface):
from zope.interface.verify import verifyObject
verifyObject(iface, plugin, tentative=True)
def make_registries(identifiers, authenticators, challengers, mdproviders):
from zope.interface.verify import BrokenImplementation
interface_registry = {}
name_registry = {}
for supplied, iface in [ (identifiers, IIdentifier),
(authenticators, IAuthenticator),
(challengers, IChallenger),
(mdproviders, IMetadataProvider)]:
for name, value in supplied:
try:
verify(value, iface)
except BrokenImplementation, why:
why = str(why)
raise ValueError(str(name) + ': ' + why)
L = interface_registry.setdefault(iface, [])
L.append(value)
name_registry[name] = value
return interface_registry, name_registry
class Identity(dict):
""" dict subclass that prevents its members from being rendered
during print """
def __repr__(self):
return '