from cStringIO import StringIO import sys import cgi import urllib import urlparse import re import textwrap from Cookie import BaseCookie from rfc822 import parsedate_tz, mktime_tz, formatdate from datetime import datetime, date, timedelta, tzinfo import time import calendar import tempfile import warnings from webob.datastruct import EnvironHeaders from webob.multidict import MultiDict, UnicodeMultiDict, NestedMultiDict, NoVars from webob.etag import AnyETag, NoETag, ETagMatcher, IfRange, NoIfRange from webob.headerdict import HeaderDict from webob.statusreasons import status_reasons from webob.cachecontrol import CacheControl, serialize_cache_control from webob.acceptparse import Accept, MIMEAccept, NilAccept, MIMENilAccept, NoAccept from webob.byterange import Range, ContentRange try: sorted except NameError: from webob.compat import sorted _CHARSET_RE = re.compile(r';\s*charset=([^;]*)', re.I) _SCHEME_RE = re.compile(r'^[a-z]+:', re.I) _PARAM_RE = re.compile(r'([a-z0-9]+)=(?:"([^"]*)"|([a-z0-9_.-]*))', re.I) _OK_PARAM_RE = re.compile(r'^[a-z0-9_.-]+$', re.I) __all__ = ['Request', 'Response', 'UTC', 'day', 'week', 'hour', 'minute', 'second', 'month', 'year', 'html_escape'] class _UTC(tzinfo): def dst(self, dt): return timedelta(0) def utcoffset(self, dt): return timedelta(0) def tzname(self, dt): return 'UTC' def __repr__(self): return 'UTC' UTC = _UTC() def html_escape(s): """HTML-escape a string or object This converts any non-string objects passed into it to strings (actually, using ``unicode()``). All values returned are non-unicode strings (using ``&#num;`` entities for all non-ASCII characters). None is treated specially, and returns the empty string. """ if s is None: return '' if hasattr(s, '__html__'): return s.__html__() if not isinstance(s, basestring): if hasattr(s, '__unicode__'): s = unicode(s) else: s = str(s) s = cgi.escape(s, True) if isinstance(s, unicode): s = s.encode('ascii', 'xmlcharrefreplace') return s def timedelta_to_seconds(td): """ Converts a timedelta instance to seconds. """ return td.seconds + (td.days*24*60*60) day = timedelta(days=1) week = timedelta(weeks=1) hour = timedelta(hours=1) minute = timedelta(minutes=1) second = timedelta(seconds=1) # Estimate, I know; good enough for expirations month = timedelta(days=30) year = timedelta(days=365) class _NoDefault: def __repr__(self): return '(No Default)' NoDefault = _NoDefault() class environ_getter(object): """For delegating an attribute to a key in self.environ.""" def __init__(self, key, default='', default_factory=None, settable=True, deletable=True, doc=None, rfc_section=None): self.key = key self.default = default self.default_factory = default_factory self.settable = settable self.deletable = deletable docstring = "Gets" if self.settable: docstring += " and sets" if self.deletable: docstring += " and deletes" docstring += " the %r key from the environment." % self.key docstring += _rfc_reference(self.key, rfc_section) if doc: docstring += '\n\n' + textwrap.dedent(doc) self.__doc__ = docstring def __get__(self, obj, type=None): if obj is None: return self if self.key not in obj.environ: if self.default_factory: val = obj.environ[self.key] = self.default_factory() return val else: return self.default return obj.environ[self.key] def __set__(self, obj, value): if not self.settable: raise AttributeError("Read-only attribute (key %r)" % self.key) if value is None: if self.key in obj.environ: del obj.environ[self.key] else: obj.environ[self.key] = value def __delete__(self, obj): if not self.deletable: raise AttributeError("You cannot delete the key %r" % self.key) del obj.environ[self.key] def __repr__(self): return '' % self.key class header_getter(object): """For delegating an attribute to a header in self.headers""" def __init__(self, header, default=None, settable=True, deletable=True, doc=None, rfc_section=None): self.header = header self.default = default self.settable = settable self.deletable = deletable docstring = "Gets" if self.settable: docstring += " and sets" if self.deletable: docstring += " and deletes" docstring += " they header %s from the headers" % self.header docstring += _rfc_reference(self.header, rfc_section) if doc: docstring += '\n\n' + textwrap.dedent(doc) self.__doc__ = docstring def __get__(self, obj, type=None): if obj is None: return self if self.header not in obj.headers: return self.default else: return obj.headers[self.header] def __set__(self, obj, value): if not self.settable: raise AttributeError("Read-only attribute (header %s)" % self.header) if value is None: if self.header in obj.headers: del obj.headers[self.header] else: if isinstance(value, unicode): # This is the standard encoding for headers: value = value.encode('ISO-8859-1') obj.headers[self.header] = value def __delete__(self, obj): if not self.deletable: raise AttributeError("You cannot delete the header %s" % self.header) del obj.headers[self.header] def __repr__(self): return '' % self.header class set_via_call(object): def __init__(self, func, adapt_args=None): self.func = func self.adapt_args = adapt_args def __get__(self, obj, type=None): return self.__class__(self.func.__get__(obj, type)) def __set__(self, obj, value): if self.adapt_args is None: args, kw = (value,), {} else: result = self.adapt_args(value) if result is None: return args, kw = result self.func(obj, *args, **kw) def __repr__(self): return 'set_via_call(%r)' % self.func def __call__(self, *args, **kw): return self.func(*args, **kw) def _adapt_cache_expires(value): if value is False: return None if value is True: return (0,), {} else: return (value,), {} class converter(object): """ Wraps a decorator, and applies conversion for that decorator """ def __init__(self, decorator, getter_converter, setter_converter, convert_name=None, doc=None, converter_args=()): self.decorator = decorator self.getter_converter = getter_converter self.setter_converter = setter_converter self.convert_name = convert_name self.converter_args = converter_args docstring = decorator.__doc__ or '' docstring += " Converts it as a " if convert_name: docstring += convert_name + '.' else: docstring += "%r and %r." % (getter_converter, setter_converter) if doc: docstring += '\n\n' + textwrap.dedent(doc) self.__doc__ = docstring def __get__(self, obj, type=None): if obj is None: return self value = self.decorator.__get__(obj, type) return self.getter_converter(value, *self.converter_args) def __set__(self, obj, value): value = self.setter_converter(value, *self.converter_args) self.decorator.__set__(obj, value) def __delete__(self, obj): self.decorator.__delete__(obj) def __repr__(self): if self.convert_name: name = ' %s' % self.convert_name else: name = '' return '' % (self.decorator, name) def _rfc_reference(header, section): if not section: return '' major_section = section.split('.')[0] link = 'http://www.w3.org/Protocols/rfc2616/rfc2616-sec%s.html#sec%s' % ( major_section, section) if header.startswith('HTTP_'): header = header[5:].title().replace('_', '-') return " For more information on %s see `section %s <%s>`_." % ( header, section, link) class deprecated_property(object): """ Wraps a decorator, with a deprecation warning or error """ def __init__(self, decorator, attr, message, warning=True): self.decorator = decorator self.attr = attr self.message = message self.warning = warning def __get__(self, obj, type=None): if obj is None: return self self.warn() return self.decorator.__get__(obj, type) def __set__(self, obj, value): self.warn() self.decorator.__set__(obj, value) def __delete__(self, obj): self.warn() self.decorator.__delete__(obj) def __repr__(self): return '' % ( self.attr, self.decorator) def warn(self): if not self.warning: raise DeprecationWarning( 'The attribute %s is deprecated: %s' % (self.attr, self.message)) else: warnings.warn( 'The attribute %s is deprecated: %s' % (self.attr, self.message), DeprecationWarning, stacklevel=3) def _parse_date(value): if not value: return None t = parsedate_tz(value) if t is None: # Could not parse return None if t[-1] is None: # No timezone given. None would mean local time, but we'll force UTC t = t[:9] + (0,) t = mktime_tz(t) return datetime.fromtimestamp(t, UTC) def _serialize_date(dt): if dt is None: return None if isinstance(dt, unicode): dt = dt.encode('ascii') if isinstance(dt, str): return dt if isinstance(dt, timedelta): dt = datetime.now() + dt if isinstance(dt, (datetime, date)): dt = dt.timetuple() if isinstance(dt, (tuple, time.struct_time)): dt = calendar.timegm(dt) if not isinstance(dt, (float, int, long)): raise ValueError( "You must pass in a datetime, date, time tuple, or integer object, not %r" % dt) return formatdate(dt) def _serialize_cookie_date(dt): if dt is None: return None if isinstance(dt, unicode): dt = dt.encode('ascii') if isinstance(dt, timedelta): dt = datetime.now() + dt if isinstance(dt, (datetime, date)): dt = dt.timetuple() return time.strftime('%a, %d-%b-%Y %H:%M:%S GMT', dt) def _parse_date_delta(value): """ like _parse_date, but also handle delta seconds """ if not value: return None try: value = int(value) except ValueError: pass else: delta = timedelta(seconds=value) return datetime.now() + delta return _parse_date(value) def _serialize_date_delta(value): if not value and value != 0: return None if isinstance(value, (float, int)): return str(int(value)) return _serialize_date(value) def _parse_etag(value, default=True): if value is None: value = '' value = value.strip() if not value: if default: return AnyETag else: return NoETag if value == '*': return AnyETag else: return ETagMatcher.parse(value) def _serialize_etag(value, default=True): if value is None: return None if value is AnyETag: if default: return None else: return '*' return str(value) def _parse_if_range(value): if not value: return NoIfRange else: return IfRange.parse(value) def _serialize_if_range(value): if value is None: return value if isinstance(value, (datetime, date)): return _serialize_date(value) if not isinstance(value, str): value = str(value) return value or None def _parse_range(value): if not value: return None # Might return None too: return Range.parse(value) def _serialize_range(value): if isinstance(value, (list, tuple)): if len(value) != 2: raise ValueError( "If setting .range to a list or tuple, it must be of length 2 (not %r)" % value) value = Range([value]) if value is None: return None value = str(value) return value or None def _parse_int(value): if value is None or value == '': return None return int(value) def _parse_int_safe(value): if value is None or value == '': return None try: return int(value) except ValueError: return None def _serialize_int(value): if value is None: return None return str(value) def _parse_content_range(value): if not value or not value.strip(): return None # May still return None return ContentRange.parse(value) def _serialize_content_range(value): if value is None: return None if isinstance(value, (tuple, list)): if len(value) not in (2, 3): raise ValueError( "When setting content_range to a list/tuple, it must " "be length 2 or 3 (not %r)" % value) if len(value) == 2: begin, end = value length = None else: begin, end, length = value value = ContentRange(begin, end, length) value = str(value).strip() if not value: return None return value def _parse_list(value): if value is None: return None value = value.strip() if not value: return None return [v.strip() for v in value.split(',') if v.strip()] def _serialize_list(value): if not value: return None if isinstance(value, unicode): value = str(value) if isinstance(value, str): return value return ', '.join(map(str, value)) def _parse_accept(value, header_name, AcceptClass, NilClass): if not value: return NilClass(header_name) return AcceptClass(header_name, value) def _serialize_accept(value, header_name, AcceptClass, NilClass): if not value or isinstance(value, NilClass): return None if isinstance(value, (list, tuple, dict)): value = NilClass(header_name) + value value = str(value).strip() if not value: return None return value class Request(object): ## Options: charset = None unicode_errors = 'strict' decode_param_names = False ## The limit after which request bodies should be stored on disk ## if they are read in (under this, and the request body is stored ## in memory): request_body_tempfile_limit = 10*1024 def __init__(self, environ=None, environ_getter=None, charset=NoDefault, unicode_errors=NoDefault, decode_param_names=NoDefault, **kw): if environ_getter is not None: raise ValueError('The environ_getter argument is no longer ' 'supported') if environ is None: raise TypeError("You must provide an environ arg") d = self.__dict__ d['environ'] = environ if charset is not NoDefault: d['charset'] = charset if unicode_errors is not NoDefault: d['unicode_errors'] = unicode_errors if decode_param_names is not NoDefault: d['decode_param_names'] = decode_param_names if kw: my_class = self.__class__ for name, value in kw.iteritems(): if not hasattr(my_class, name): raise TypeError( "Unexpected keyword: %s=%r" % (name, value)) setattr(self, name, value) def __setattr__(self, attr, value, DEFAULT=[]): ## FIXME: I don't know why I need this guard (though experimentation says I do) if getattr(self.__class__, attr, DEFAULT) is not DEFAULT or attr.startswith('_'): object.__setattr__(self, attr, value) else: self.environ.setdefault('webob.adhoc_attrs', {})[attr] = value def __getattr__(self, attr): ## FIXME: I don't know why I need this guard (though experimentation says I do) if attr in self.__class__.__dict__: return object.__getattribute__(self, attr) try: return self.environ['webob.adhoc_attrs'][attr] except KeyError: raise AttributeError(attr) def __delattr__(self, attr): ## FIXME: I don't know why I need this guard (though experimentation says I do) if attr in self.__class__.__dict__: return object.__delattr__(self, attr) try: del self.environ['webob.adhoc_attrs'][attr] except KeyError: raise AttributeError(attr) def _body_file__get(self): """ Access the body of the request (wsgi.input) as a file-like object. If you set this value, CONTENT_LENGTH will also be updated (either set to -1, 0 if you delete the attribute, or if you set the attribute to a string then the length of the string). """ return self.environ['wsgi.input'] def _body_file__set(self, value): if isinstance(value, str): length = len(value) value = StringIO(value) else: length = -1 self.environ['wsgi.input'] = value self.environ['CONTENT_LENGTH'] = str(length) def _body_file__del(self): self.environ['wsgi.input'] = StringIO('') self.environ['CONTENT_LENGTH'] = '0' body_file = property(_body_file__get, _body_file__set, _body_file__del, doc=_body_file__get.__doc__) scheme = environ_getter('wsgi.url_scheme') method = environ_getter('REQUEST_METHOD') script_name = environ_getter('SCRIPT_NAME') path_info = environ_getter('PATH_INFO') ## FIXME: should I strip out parameters?: content_type = environ_getter('CONTENT_TYPE', rfc_section='14.17') content_length = converter( environ_getter('CONTENT_LENGTH', rfc_section='14.13'), _parse_int_safe, _serialize_int, 'int') remote_user = environ_getter('REMOTE_USER', default=None) remote_addr = environ_getter('REMOTE_ADDR', default=None) query_string = environ_getter('QUERY_STRING') server_name = environ_getter('SERVER_NAME') server_port = converter( environ_getter('SERVER_PORT'), _parse_int, _serialize_int, 'int') _headers = None def _headers__get(self): """ All the request headers as a case-insensitive dictionary-like object. """ if self._headers is None: self._headers = EnvironHeaders(self.environ) return self._headers def _headers__set(self, value): self.headers.clear() self.headers.update(value) headers = property(_headers__get, _headers__set, doc=_headers__get.__doc__) def host_url(self): """ The URL through the host (no path) """ e = self.environ url = e['wsgi.url_scheme'] + '://' if e.get('HTTP_HOST'): host = e['HTTP_HOST'] if ':' in host: host, port = host.split(':', 1) else: port = None else: host = e['SERVER_NAME'] port = e['SERVER_PORT'] if self.environ['wsgi.url_scheme'] == 'https': if port == '443': port = None elif self.environ['wsgi.url_scheme'] == 'http': if port == '80': port = None url += host if port: url += ':%s' % port return url host_url = property(host_url, doc=host_url.__doc__) def application_url(self): """ The URL including SCRIPT_NAME (no PATH_INFO or query string) """ return self.host_url + urllib.quote(self.environ.get('SCRIPT_NAME', '')) application_url = property(application_url, doc=application_url.__doc__) def path_url(self): """ The URL including SCRIPT_NAME and PATH_INFO, but not QUERY_STRING """ return self.application_url + urllib.quote(self.environ.get('PATH_INFO', '')) path_url = property(path_url, doc=path_url.__doc__) def path(self): """ The path of the request, without host or query string """ return urllib.quote(self.script_name) + urllib.quote(self.path_info) path = property(path, doc=path.__doc__) def path_qs(self): """ The path of the request, without host but with query string """ path = self.path qs = self.environ.get('QUERY_STRING') if qs: path += '?' + qs return path path_qs = property(path_qs, doc=path_qs.__doc__) def url(self): """ The full request URL, including QUERY_STRING """ url = self.path_url if self.environ.get('QUERY_STRING'): url += '?' + self.environ['QUERY_STRING'] return url url = property(url, doc=url.__doc__) def relative_url(self, other_url, to_application=False): """ Resolve other_url relative to the request URL. If ``to_application`` is True, then resolve it relative to the URL with only SCRIPT_NAME """ if to_application: url = self.application_url if not url.endswith('/'): url += '/' else: url = self.path_url return urlparse.urljoin(url, other_url) def path_info_pop(self): """ 'Pops' off the next segment of PATH_INFO, pushing it onto SCRIPT_NAME, and returning the popped segment. Returns None if there is nothing left on PATH_INFO. Does not return ``''`` when there's an empty segment (like ``/path//path``); these segments are just ignored. """ path = self.path_info if not path: return None while path.startswith('/'): self.script_name += '/' path = path[1:] if '/' not in path: self.script_name += path self.path_info = '' return path else: segment, path = path.split('/', 1) self.path_info = '/' + path self.script_name += segment return segment def path_info_peek(self): """ Returns the next segment on PATH_INFO, or None if there is no next segment. Doesn't modify the environment. """ path = self.path_info if not path: return None path = path.lstrip('/') return path.split('/', 1)[0] def _urlvars__get(self): """ Return any *named* variables matched in the URL. Takes values from ``environ['wsgiorg.routing_args']``. Systems like ``routes`` set this value. """ if 'paste.urlvars' in self.environ: return self.environ['paste.urlvars'] elif 'wsgiorg.routing_args' in self.environ: return self.environ['wsgiorg.routing_args'][1] else: result = {} self.environ['wsgiorg.routing_args'] = ((), result) return result def _urlvars__set(self, value): environ = self.environ if 'wsgiorg.routing_args' in environ: environ['wsgiorg.routing_args'] = (environ['wsgiorg.routing_args'][0], value) if 'paste.urlvars' in environ: del environ['paste.urlvars'] elif 'paste.urlvars' in environ: environ['paste.urlvars'] = value else: environ['wsgiorg.routing_args'] = ((), value) def _urlvars__del(self): if 'paste.urlvars' in self.environ: del self.environ['paste.urlvars'] if 'wsgiorg.routing_args' in self.environ: if not self.environ['wsgiorg.routing_args'][0]: del self.environ['wsgiorg.routing_args'] else: self.environ['wsgiorg.routing_args'] = (self.environ['wsgiorg.routing_args'][0], {}) urlvars = property(_urlvars__get, _urlvars__set, _urlvars__del, doc=_urlvars__get.__doc__) def _urlargs__get(self): """ Return any *positional* variables matched in the URL. Takes values from ``environ['wsgiorg.routing_args']``. Systems like ``routes`` set this value. """ if 'wsgiorg.routing_args' in self.environ: return self.environ['wsgiorg.routing_args'][0] else: # Since you can't update this value in-place, we don't need # to set the key in the environment return () def _urlargs__set(self, value): environ = self.environ if 'paste.urlvars' in environ: # Some overlap between this and wsgiorg.routing_args; we need # wsgiorg.routing_args to make this work routing_args = (value, environ.pop('paste.urlvars')) elif 'wsgiorg.routing_args' in environ: routing_args = (value, environ['wsgiorg.routing_args'][1]) else: routing_args = (value, {}) environ['wsgiorg.routing_args'] = routing_args def _urlargs__del(self): if 'wsgiorg.routing_args' in self.environ: if not self.environ['wsgiorg.routing_args'][1]: del self.environ['wsgiorg.routing_args'] else: self.environ['wsgiorg.routing_args'] = ((), self.environ['wsgiorg.routing_args'][1]) urlargs = property(_urlargs__get, _urlargs__set, _urlargs__del, _urlargs__get.__doc__) def is_xhr(self): """Returns a boolean if X-Requested-With is present and ``XMLHttpRequest`` Note: this isn't set by every XMLHttpRequest request, it is only set if you are using a Javascript library that sets it (or you set the header yourself manually). Currently Prototype and jQuery are known to set this header.""" return self.environ.get('HTTP_X_REQUESTED_WITH', '') == 'XMLHttpRequest' is_xhr = property(is_xhr, doc=is_xhr.__doc__) def _host__get(self): """Host name provided in HTTP_HOST, with fall-back to SERVER_NAME""" if 'HTTP_HOST' in self.environ: return self.environ['HTTP_HOST'] else: return '%(SERVER_NAME)s:%(SERVER_PORT)s' % self.environ def _host__set(self, value): self.environ['HTTP_HOST'] = value def _host__del(self): if 'HTTP_HOST' in self.environ: del self.environ['HTTP_HOST'] host = property(_host__get, _host__set, _host__del, doc=_host__get.__doc__) def _body__get(self): """ Return the content of the request body. """ try: length = int(self.environ.get('CONTENT_LENGTH', '0')) except ValueError: return '' c = self.body_file.read(length) if hasattr(self.body_file, 'seek'): self.body_file.seek(0) else: tempfile_limit = self.request_body_tempfile_limit if tempfile_limit and len(c) > tempfile_limit: fileobj = tempfile.TemporaryFile() fileobj.write(c) fileobj.seek(0) else: fileobj = StringIO(c) # We don't want/need to lose CONTENT_LENGTH here (as setting # self.body_file would do): self.environ['wsgi.input'] = fileobj return c def _body__set(self, value): if value is None: del self.body return if not isinstance(value, str): raise TypeError( "You can only set Request.body to a str (not %r)" % type(value)) body_file = StringIO(value) self.body_file = body_file self.environ['CONTENT_LENGTH'] = str(len(value)) def _body__del(self, value): del self.body_file body = property(_body__get, _body__set, _body__del, doc=_body__get.__doc__) def str_POST(self): """ Return a MultiDict containing all the variables from a form request. Returns an empty dict-like object for non-form requests. Form requests are typically POST requests, however PUT requests with an appropriate Content-Type are also supported. """ env = self.environ if self.method not in ('POST', 'PUT'): return NoVars('Not a form request') if 'webob._parsed_post_vars' in env: vars, body_file = env['webob._parsed_post_vars'] if body_file is self.body_file: return vars # Paste compatibility: if 'paste.parsed_formvars' in env: # from paste.request.parse_formvars vars, body_file = env['paste.parsed_formvars'] if body_file is self.body_file: # FIXME: is it okay that this isn't *our* MultiDict? return vars content_type = self.content_type if ';' in content_type: content_type = content_type.split(';', 1)[0] if (self.method == 'PUT' and not content_type) or \ content_type not in ('', 'application/x-www-form-urlencoded', 'multipart/form-data'): # Not an HTML form submission return NoVars('Not an HTML form submission (Content-Type: %s)' % content_type) # FieldStorage assumes a default CONTENT_LENGTH of -1, but a # default of 0 is better: env.setdefault('CONTENT_LENGTH', 0) fs_environ = env.copy() fs_environ['QUERY_STRING'] = '' fs = cgi.FieldStorage(fp=self.body_file, environ=fs_environ, keep_blank_values=True) vars = MultiDict.from_fieldstorage(fs) FakeCGIBody.update_environ(env, vars) env['webob._parsed_post_vars'] = (vars, self.body_file) return vars str_POST = property(str_POST, doc=str_POST.__doc__) str_postvars = deprecated_property(str_POST, 'str_postvars', 'use str_POST instead') def POST(self): """ Like ``.str_POST``, but may decode values and keys """ vars = self.str_POST if self.charset: vars = UnicodeMultiDict(vars, encoding=self.charset, errors=self.unicode_errors, decode_keys=self.decode_param_names) return vars POST = property(POST, doc=POST.__doc__) postvars = deprecated_property(POST, 'postvars', 'use POST instead') def str_GET(self): """ Return a MultiDict containing all the variables from the QUERY_STRING. """ env = self.environ source = env.get('QUERY_STRING', '') if 'webob._parsed_query_vars' in env: vars, qs = env['webob._parsed_query_vars'] if qs == source: return vars if not source: vars = MultiDict() else: vars = MultiDict(cgi.parse_qsl( source, keep_blank_values=True, strict_parsing=False)) env['webob._parsed_query_vars'] = (vars, source) return vars str_GET = property(str_GET, doc=str_GET.__doc__) str_queryvars = deprecated_property(str_GET, 'str_queryvars', 'use str_GET instead') def GET(self): """ Like ``.str_GET``, but may decode values and keys """ vars = self.str_GET if self.charset: vars = UnicodeMultiDict(vars, encoding=self.charset, errors=self.unicode_errors, decode_keys=self.decode_param_names) return vars GET = property(GET, doc=GET.__doc__) queryvars = deprecated_property(GET, 'queryvars', 'use GET instead') def str_params(self): """ A dictionary-like object containing both the parameters from the query string and request body. """ return NestedMultiDict(self.str_GET, self.str_POST) str_params = property(str_params, doc=str_params.__doc__) def params(self): """ Like ``.str_params``, but may decode values and keys """ params = self.str_params if self.charset: params = UnicodeMultiDict(params, encoding=self.charset, errors=self.unicode_errors, decode_keys=self.decode_param_names) return params params = property(params, doc=params.__doc__) _rx_quotes = re.compile('"(.*)"') def str_cookies(self): """ Return a *plain* dictionary of cookies as found in the request. """ env = self.environ source = env.get('HTTP_COOKIE', '') if 'webob._parsed_cookies' in env: vars, var_source = env['webob._parsed_cookies'] if var_source == source: return vars vars = {} if source: cookies = BaseCookie() cookies.load(source) for name in cookies: value = cookies[name].value unquote_match = self._rx_quotes.match(value) if unquote_match is not None: value = unquote_match.group(1) vars[name] = value env['webob._parsed_cookies'] = (vars, source) return vars str_cookies = property(str_cookies, doc=str_cookies.__doc__) def cookies(self): """ Like ``.str_cookies``, but may decode values and keys """ vars = self.str_cookies if self.charset: vars = UnicodeMultiDict(vars, encoding=self.charset, errors=self.unicode_errors, decode_keys=self.decode_param_names) return vars cookies = property(cookies, doc=cookies.__doc__) def copy(self): """ Copy the request and environment object. This only does a shallow copy, except of wsgi.input """ env = self.environ.copy() new_req = self.__class__(env) new_req.copy_body() return new_req def copy_get(self): """ Copies the request and environment object, but turning this request into a GET along the way. If this was a POST request (or any other verb) then it becomes GET, and the request body is thrown away. """ env = self.environ.copy() env['wsgi.input'] = StringIO('') env['CONTENT_LENGTH'] = '0' if 'CONTENT_TYPE' in env: del env['CONTENT_TYPE'] env['REQUEST_METHOD'] = 'GET' return self.__class__(env) def make_body_seekable(self): """ This forces ``environ['wsgi.input']`` to be seekable. That is, if it doesn't have a `seek` method already, the content is copied into a StringIO or temporary file. The choice to copy to StringIO is made from ``self.request_body_tempfile_limit`` """ input = self.body_file if hasattr(input, 'seek'): # It has a seek method, so we don't need to do anything return self.copy_body() def copy_body(self): """ Copies the body, in cases where it might be shared with another request object and that is not desired. This copies the body in-place, either into a StringIO object or a temporary file. """ length = self.content_length if length == 0: # No real need to copy this, but of course it is free self.body_file = StringIO('') return tempfile_limit = self.request_body_tempfile_limit body = None input = self.body_file if hasattr(input, 'seek'): # Just in case someone has read parts of the body already ## FIXME: Should we use .tell() to try to put the body ## back to its previous position? input.seek(0) if length in (-1, None): body = self.body length = len(body) self.content_length = length if tempfile_limit and length > tempfile_limit: fileobj = tempfile.TemporaryFile() if body is None: while length: data = input.read(min(length, 4096)) fileobj.write(data) length -= len(data) else: fileobj.write(body) fileobj.seek(0) else: if body is None: body = input.read(length) fileobj = StringIO(body) self.body_file = fileobj def remove_conditional_headers(self, remove_encoding=True, remove_range=True, remove_match=True, remove_modified=True): """ Remove headers that make the request conditional. These headers can cause the response to be 304 Not Modified, which in some cases you may not want to be possible. This does not remove headers like If-Match, which are used for conflict detection. """ check_keys = [] if remove_range: check_keys += ['HTTP_IF_RANGE', 'HTTP_RANGE'] if remove_match: check_keys.append('HTTP_IF_NONE_MATCH') if remove_modified: check_keys.append('HTTP_IF_MODIFIED_SINCE') if remove_encoding: check_keys.append('HTTP_ACCEPT_ENCODING') for key in check_keys: if key in self.environ: del self.environ[key] accept = converter( environ_getter('HTTP_ACCEPT', rfc_section='14.1'), _parse_accept, _serialize_accept, 'MIME Accept', converter_args=('Accept', MIMEAccept, MIMENilAccept)) accept_charset = converter( environ_getter('HTTP_ACCEPT_CHARSET', rfc_section='14.2'), _parse_accept, _serialize_accept, 'accept header', converter_args=('Accept-Charset', Accept, NilAccept)) accept_encoding = converter( environ_getter('HTTP_ACCEPT_ENCODING', rfc_section='14.3'), _parse_accept, _serialize_accept, 'accept header', converter_args=('Accept-Encoding', Accept, NoAccept)) accept_language = converter( environ_getter('HTTP_ACCEPT_LANGUAGE', rfc_section='14.4'), _parse_accept, _serialize_accept, 'accept header', converter_args=('Accept-Language', Accept, NilAccept)) ## FIXME: 14.8 Authorization ## http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.8 def _cache_control__get(self): """ Get/set/modify the Cache-Control header (section `14.9 `_) """ env = self.environ value = env.get('HTTP_CACHE_CONTROL', '') cache_header, cache_obj = env.get('webob._cache_control', (None, None)) if cache_obj is not None and cache_header == value: return cache_obj cache_obj = CacheControl.parse(value, type='request') env['webob._cache_control'] = (value, cache_obj) return cache_obj def _cache_control__set(self, value): env = self.environ if not value: value = "" if isinstance(value, dict): value = CacheControl(value, type='request') elif isinstance(value, CacheControl): str_value = str(value) env['HTTP_CACHE_CONTROL'] = str_value env['webob._cache_control'] = (str_value, value) else: env['HTTP_CACHE_CONTROL'] = str(value) if 'webob._cache_control' in env: del env['webob._cache_control'] def _cache_control__del(self, value): env = self.environ if 'HTTP_CACHE_CONTROL' in env: del env['HTTP_CACHE_CONTROL'] if 'webob._cache_control' in env: del env['webob._cache_control'] cache_control = property(_cache_control__get, _cache_control__set, _cache_control__del, doc=_cache_control__get.__doc__) date = converter( environ_getter('HTTP_DATE', rfc_section='14.8'), _parse_date, _serialize_date, 'HTTP date') if_match = converter( environ_getter('HTTP_IF_MATCH', rfc_section='14.24'), _parse_etag, _serialize_etag, 'ETag', converter_args=(True,)) if_modified_since = converter( environ_getter('HTTP_IF_MODIFIED_SINCE', rfc_section='14.25'), _parse_date, _serialize_date, 'HTTP date') if_none_match = converter( environ_getter('HTTP_IF_NONE_MATCH', rfc_section='14.26'), _parse_etag, _serialize_etag, 'ETag', converter_args=(False,)) if_range = converter( environ_getter('HTTP_IF_RANGE', rfc_section='14.27'), _parse_if_range, _serialize_if_range, 'IfRange object') if_unmodified_since = converter( environ_getter('HTTP_IF_UNMODIFIED_SINCE', rfc_section='14.28'), _parse_date, _serialize_date, 'HTTP date') max_forwards = converter( environ_getter('HTTP_MAX_FORWARDS', rfc_section='14.31'), _parse_int, _serialize_int, 'int') pragma = environ_getter('HTTP_PRAGMA', rfc_section='14.32') range = converter( environ_getter('HTTP_RANGE', rfc_section='14.35'), _parse_range, _serialize_range, 'Range object') referer = environ_getter('HTTP_REFERER', rfc_section='14.36') referrer = referer user_agent = environ_getter('HTTP_USER_AGENT', rfc_section='14.43') def __repr__(self): msg = '<%s at 0x%x %s %s>' % ( self.__class__.__name__, abs(id(self)), self.method, self.url) return msg def __str__(self): url = self.url host = self.host_url assert url.startswith(host) url = url[len(host):] if 'Host' not in self.headers: self.headers['Host'] = self.host parts = ['%s %s' % (self.method, url)] for name, value in sorted(self.headers.items()): parts.append('%s: %s' % (name, value)) parts.append('') parts.append(self.body) return '\r\n'.join(parts) def call_application(self, application, catch_exc_info=False): """ Call the given WSGI application, returning ``(status_string, headerlist, app_iter)`` Be sure to call ``app_iter.close()`` if it's there. If catch_exc_info is true, then returns ``(status_string, headerlist, app_iter, exc_info)``, where the fourth item may be None, but won't be if there was an exception. If you don't do this and there was an exception, the exception will be raised directly. """ captured = [] output = [] def start_response(status, headers, exc_info=None): if exc_info is not None and not catch_exc_info: raise exc_info[0], exc_info[1], exc_info[2] captured[:] = [status, headers, exc_info] return output.append app_iter = application(self.environ, start_response) if (not captured or output): try: output.extend(app_iter) finally: if hasattr(app_iter, 'close'): app_iter.close() app_iter = output if catch_exc_info: return (captured[0], captured[1], app_iter, captured[2]) else: return (captured[0], captured[1], app_iter) # Will be filled in later: ResponseClass = None def get_response(self, application, catch_exc_info=False): """ Like ``.call_application(application)``, except returns a response object with ``.status``, ``.headers``, and ``.body`` attributes. This will use ``self.ResponseClass`` to figure out the class of the response object to return. """ if catch_exc_info: status, headers, app_iter, exc_info = self.call_application( application, catch_exc_info=True) del exc_info else: status, headers, app_iter = self.call_application( application, catch_exc_info=False) return self.ResponseClass( status=status, headerlist=headers, app_iter=app_iter, request=self) #@classmethod def blank(cls, path, environ=None, base_url=None, headers=None, **kw): """ Create a blank request environ (and Request wrapper) with the given path (path should be urlencoded), and any keys from environ. The path will become path_info, with any query string split off and used. All necessary keys will be added to the environ, but the values you pass in will take precedence. If you pass in base_url then wsgi.url_scheme, HTTP_HOST, and SCRIPT_NAME will be filled in from that value. Any extra keyword will be passed to ``__init__`` (e.g., ``decode_param_names``). """ if _SCHEME_RE.search(path): scheme, netloc, path, qs, fragment = urlparse.urlsplit(path) if fragment: raise TypeError( "Path cannot contain a fragment (%r)" % fragment) if qs: path += '?' + qs if ':' not in netloc: if scheme == 'http': netloc += ':80' elif scheme == 'https': netloc += ':443' else: raise TypeError("Unknown scheme: %r" % scheme) else: scheme = 'http' netloc = 'localhost:80' if path and '?' in path: path_info, query_string = path.split('?', 1) path_info = urllib.unquote(path_info) else: path_info = urllib.unquote(path) query_string = '' env = { 'REQUEST_METHOD': 'GET', 'SCRIPT_NAME': '', 'PATH_INFO': path_info or '', 'QUERY_STRING': query_string, 'SERVER_NAME': netloc.split(':')[0], 'SERVER_PORT': netloc.split(':')[1], 'HTTP_HOST': netloc, 'SERVER_PROTOCOL': 'HTTP/1.0', 'wsgi.version': (1, 0), 'wsgi.url_scheme': scheme, 'wsgi.input': StringIO(''), 'wsgi.errors': sys.stderr, 'wsgi.multithread': False, 'wsgi.multiprocess': False, 'wsgi.run_once': False, } if base_url: scheme, netloc, path, query, fragment = urlparse.urlsplit(base_url) if query or fragment: raise ValueError( "base_url (%r) cannot have a query or fragment" % base_url) if scheme: env['wsgi.url_scheme'] = scheme if netloc: if ':' not in netloc: if scheme == 'http': netloc += ':80' elif scheme == 'https': netloc += ':443' else: raise ValueError( "Unknown scheme: %r" % scheme) host, port = netloc.split(':', 1) env['SERVER_PORT'] = port env['SERVER_NAME'] = host env['HTTP_HOST'] = netloc if path: env['SCRIPT_NAME'] = urllib.unquote(path) if environ: env.update(environ) obj = cls(env, **kw) if headers is not None: obj.headers.update(headers) return obj blank = classmethod(blank) class Response(object): """ Represents a WSGI response """ default_content_type = 'text/html' default_charset = 'UTF-8' unicode_errors = 'strict' default_conditional_response = False def __init__(self, body=None, status=None, headerlist=None, app_iter=None, request=None, content_type=None, conditional_response=NoDefault, **kw): if app_iter is None: if body is None: body = '' elif body is not None: raise TypeError( "You may only give one of the body and app_iter arguments") if status is None: self._status = '200 OK' else: self.status = status if headerlist is None: self._headerlist = [] else: self._headerlist = headerlist self._headers = None if request is not None: if hasattr(request, 'environ'): self._environ = request.environ self._request = request else: self._environ = request self._request = None else: self._environ = self._request = None if content_type is None: content_type = self.default_content_type charset = None if 'charset' in kw: charset = kw.pop('charset') elif self.default_charset and headerlist is None: if content_type and (content_type == 'text/html' or content_type.startswith('text/') or content_type.startswith('application/xml') or (content_type.startswith('application/') and content_type.endswith('+xml'))): charset = self.default_charset if content_type and charset: content_type += '; charset=' + charset elif self._headerlist and charset: self.charset = charset if not self._headerlist and content_type: self._headerlist.append(('Content-Type', content_type)) if conditional_response is NoDefault: self.conditional_response = self.default_conditional_response else: self.conditional_response = conditional_response if app_iter is not None: self._app_iter = app_iter self._body = None else: if isinstance(body, unicode): if charset is None: raise TypeError( "You cannot set the body to a unicode value without a charset") body = body.encode(charset) self._body = body if headerlist is None: self._headerlist.append(('Content-Length', str(len(body)))) else: self.headers['Content-Length'] = str(len(body)) self._app_iter = None for name, value in kw.iteritems(): if not hasattr(self.__class__, name): # Not a basic attribute raise TypeError( "Unexpected keyword: %s=%r" % (name, value)) setattr(self, name, value) def __repr__(self): return '<%s at 0x%x %s>' % ( self.__class__.__name__, abs(id(self)), self.status) def __str__(self): return (self.status + '\n' + '\n'.join(['%s: %s' % (name, value) for name, value in self.headerlist]) + '\n\n' + self.body) def copy(self): """Makes a copy of the response""" if self._app_iter is not None: app_iter = self._app_iter else: app_iter = [self._body] return self.__class__( content_type=False, status=self._status, headerlist=self._headerlist, app_iter=app_iter, conditional_response=self.conditional_response) def _status__get(self): """ The status string """ return self._status def _status__set(self, value): if isinstance(value, int): value = str(value) if not isinstance(value, str): raise TypeError( "You must set status to a string or integer (not %s)" % type(value)) if ' ' not in value: # Need to add a reason: code = int(value) reason = status_reasons[code] value += ' ' + reason self._status = value status = property(_status__get, _status__set, doc=_status__get.__doc__) def _status_int__get(self): """ The status as an integer """ return int(self.status.split()[0]) def _status_int__set(self, value): self.status = value status_int = property(_status_int__get, _status_int__set, doc=_status_int__get.__doc__) def _headerlist__get(self): """ The list of response headers """ return self._headerlist def _headerlist__set(self, value): self._headers = None if not isinstance(value, list): if hasattr(value, 'items'): value = value.items() value = list(value) self._headerlist = value def _headerlist__del(self): self.headerlist = [] headerlist = property(_headerlist__get, _headerlist__set, _headerlist__del, doc=_headerlist__get.__doc__) def _charset__get(self): """ Get/set the charset (in the Content-Type) """ header = self.headers.get('content-type') if not header: return None match = _CHARSET_RE.search(header) if match: return match.group(1) return None def _charset__set(self, charset): if charset is None: del self.charset return try: header = self.headers.pop('content-type') except KeyError: raise AttributeError( "You cannot set the charset when no content-type is defined") match = _CHARSET_RE.search(header) if match: header = header[:match.start()] + header[match.end():] header += '; charset=%s' % charset self.headers['content-type'] = header def _charset__del(self): try: header = self.headers.pop('content-type') except KeyError: # Don't need to remove anything return match = _CHARSET_RE.search(header) if match: header = header[:match.start()] + header[match.end():] self.headers['content-type'] = header charset = property(_charset__get, _charset__set, _charset__del, doc=_charset__get.__doc__) def _content_type__get(self): """ Get/set the Content-Type header (or None), *without* the charset or any parameters. If you include parameters (or ``;`` at all) when setting the content_type, any existing parameters will be deleted; otherwise they will be preserved. """ header = self.headers.get('content-type') if not header: return None return header.split(';', 1)[0] def _content_type__set(self, value): if ';' not in value: header = self.headers.get('content-type', '') if ';' in header: params = header.split(';', 1)[1] value += ';' + params self.headers['content-type'] = value def _content_type__del(self): try: del self.headers['content-type'] except KeyError: pass content_type = property(_content_type__get, _content_type__set, _content_type__del, doc=_content_type__get.__doc__) def _content_type_params__get(self): """ Returns a dictionary of all the parameters in the content type. """ params = self.headers.get('content-type', '') if ';' not in params: return {} params = params.split(';', 1)[1] result = {} for match in _PARAM_RE.finditer(params): result[match.group(1)] = match.group(2) or match.group(3) or '' return result def _content_type_params__set(self, value_dict): if not value_dict: del self.content_type_params return params = [] for k, v in sorted(value_dict.items()): if not _OK_PARAM_RE.search(v): ## FIXME: I'm not sure what to do with "'s in the parameter value ## I think it might be simply illegal v = '"%s"' % v.replace('"', '\\"') params.append('; %s=%s' % (k, v)) ct = self.headers.pop('content-type', '').split(';', 1)[0] ct += ''.join(params) self.headers['content-type'] = ct def _content_type_params__del(self, value): self.headers['content-type'] = self.headers.get('content-type', '').split(';', 1)[0] content_type_params = property(_content_type_params__get, _content_type_params__set, _content_type_params__del, doc=_content_type_params__get.__doc__) def _headers__get(self): """ The headers in a dictionary-like object """ if self._headers is None: self._headers = HeaderDict.view_list(self.headerlist) return self._headers def _headers__set(self, value): if hasattr(value, 'items'): value = value.items() self.headerlist = value self._headers = None headers = property(_headers__get, _headers__set, doc=_headers__get.__doc__) def _body__get(self): """ The body of the response, as a ``str``. This will read in the entire app_iter if necessary. """ if self._body is None: if self._app_iter is None: raise AttributeError( "No body has been set") try: self._body = ''.join(self._app_iter) finally: if hasattr(self._app_iter, 'close'): self._app_iter.close() self._app_iter = None self.content_length = len(self._body) return self._body def _body__set(self, value): if isinstance(value, unicode): raise TypeError( "You cannot set Response.body to a unicode object (use Response.unicode_body)") if not isinstance(value, str): raise TypeError( "You can only set the body to a str (not %s)" % type(value)) try: if self._body or self._app_iter: self.content_md5 = None except AttributeError: # if setting body early in initialization _body and _app_iter don't exist yet pass self._body = value self.content_length = len(value) self._app_iter = None def _body__del(self): self._body = None self.content_length = None self._app_iter = None body = property(_body__get, _body__set, _body__del, doc=_body__get.__doc__) def _body_file__get(self): """ Returns a file-like object that can be used to write to the body. If you passed in a list app_iter, that app_iter will be modified by writes. """ return ResponseBodyFile(self) def _body_file__del(self): del self.body body_file = property(_body_file__get, fdel=_body_file__del, doc=_body_file__get.__doc__) def write(self, text): if isinstance(text, unicode): self.unicode_body += text else: self.body += text def _unicode_body__get(self): """ Get/set the unicode value of the body (using the charset of the Content-Type) """ if not self.charset: raise AttributeError( "You cannot access Response.unicode_body unless charset is set") body = self.body return body.decode(self.charset, self.unicode_errors) def _unicode_body__set(self, value): if not self.charset: raise AttributeError( "You cannot access Response.unicode_body unless charset is set") if not isinstance(value, unicode): raise TypeError( "You can only set Response.unicode_body to a unicode string (not %s)" % type(value)) self.body = value.encode(self.charset) def _unicode_body__del(self): del self.body unicode_body = property(_unicode_body__get, _unicode_body__set, _unicode_body__del, doc=_unicode_body__get.__doc__) def _app_iter__get(self): """ Returns the app_iter of the response. If body was set, this will create an app_iter from that body (a single-item list) """ if self._app_iter is None: if self._body is None: raise AttributeError( "No body or app_iter has been set") return [self._body] else: return self._app_iter def _app_iter__set(self, value): if self._body is not None: # Undo the automatically-set content-length self.content_length = None self._app_iter = value self._body = None def _app_iter__del(self): self.content_length = None self._app_iter = self._body = None app_iter = property(_app_iter__get, _app_iter__set, _app_iter__del, doc=_app_iter__get.__doc__) def set_cookie(self, key, value='', max_age=None, path='/', domain=None, secure=None, httponly=False, version=None, comment=None, expires=None): """ Set (add) a cookie for the response """ if isinstance(value, unicode) and self.charset is not None: value = '"%s"' % value.encode(self.charset) cookies = BaseCookie() cookies[key] = value if isinstance(max_age, timedelta): max_age = max_age.seconds + max_age.days*24*60*60 if max_age is not None and expires is None: expires = datetime.utcnow() + timedelta(seconds=max_age) if isinstance(expires, timedelta): expires = datetime.utcnow() + expires if isinstance(expires, datetime): expires = '"'+_serialize_cookie_date(expires)+'"' for var_name, var_value in [ ('max_age', max_age), ('path', path), ('domain', domain), ('secure', secure), ('HttpOnly', httponly), ('version', version), ('comment', comment), ('expires', expires), ]: if var_value is not None and var_value is not False: cookies[key][var_name.replace('_', '-')] = str(var_value) header_value = cookies[key].output(header='').lstrip() if header_value.endswith(';'): # Python 2.4 adds a trailing ; to the end, strip it to be # consistent with 2.5 header_value = header_value[:-1] self.headerlist.append(('Set-Cookie', header_value)) def delete_cookie(self, key, path='/', domain=None): """ Delete a cookie from the client. Note that path and domain must match how the cookie was originally set. This sets the cookie to the empty string, and max_age=0 so that it should expire immediately. """ self.set_cookie(key, '', path=path, domain=domain, max_age=0, expires=timedelta(days=-5)) def unset_cookie(self, key): """ Unset a cookie with the given name (remove it from the response). If there are multiple cookies (e.g., two cookies with the same name and different paths or domains), all such cookies will be deleted. """ existing = self.headers.getall('Set-Cookie') if not existing: raise KeyError( "No cookies at all have been set") del self.headers['Set-Cookie'] found = False for header in existing: cookies = BaseCookie() cookies.load(header) if key in cookies: found = True del cookies[key] header = cookies.output(header='').lstrip() if header: if header.endswith(';'): # Python 2.4 adds a trailing ; to the end, strip it # to be consistent with 2.5 header = header[:-1] self.headers.add('Set-Cookie', header) if not found: raise KeyError( "No cookie has been set with the name %r" % key) def _location__get(self): """ Retrieve the Location header of the response, or None if there is no header. If the header is not absolute and this response is associated with a request, make the header absolute. For more information see `section 14.30 `_. """ if 'location' not in self.headers: return None location = self.headers['location'] if _SCHEME_RE.search(location): # Absolute return location if self.request is not None: base_uri = self.request.url location = urlparse.urljoin(base_uri, location) return location def _location__set(self, value): if value is None: del self.location return if isinstance(value, unicode): value = value.encode('ISO-8859-1') if not _SCHEME_RE.search(value): # Not absolute, see if we can make it absolute if self.request is not None: value = urlparse.urljoin(self.request.url, value) self.headers['location'] = value def _location__del(self): if 'location' in self.headers: del self.headers['location'] location = property(_location__get, _location__set, _location__del, doc=_location__get.__doc__) accept_ranges = header_getter('Accept-Ranges', rfc_section='14.5') age = converter( header_getter('Age', rfc_section='14.6'), _parse_int_safe, _serialize_int, 'int') allow = converter( header_getter('Allow', rfc_section='14.7'), _parse_list, _serialize_list, 'list') _cache_control_obj = None def _cache_control__get(self): """ Get/set/modify the Cache-Control header (section `14.9 `_) """ value = self.headers.get('cache-control', '') if self._cache_control_obj is None: self._cache_control_obj = CacheControl.parse(value, updates_to=self._update_cache_control, type='response') self._cache_control_obj.header_value = value if self._cache_control_obj.header_value != value: new_obj = CacheControl.parse(value, type='response') self._cache_control_obj.properties.clear() self._cache_control_obj.properties.update(new_obj.properties) self._cache_control_obj.header_value = value return self._cache_control_obj def _cache_control__set(self, value): # This actually becomes a copy if not value: value = "" if isinstance(value, dict): value = CacheControl(value, 'response') if isinstance(value, unicode): value = str(value) if isinstance(value, str): if self._cache_control_obj is None: self.headers['Cache-Control'] = value return value = CacheControl.parse(value, 'response') cache = self.cache_control cache.properties.clear() cache.properties.update(value.properties) def _cache_control__del(self): self.cache_control = {} def _update_cache_control(self, prop_dict): value = serialize_cache_control(prop_dict) if not value: if 'Cache-Control' in self.headers: del self.headers['Cache-Control'] else: self.headers['Cache-Control'] = value cache_control = property(_cache_control__get, _cache_control__set, _cache_control__del, doc=_cache_control__get.__doc__) def _cache_expires(self, seconds=0, **kw): """ Set expiration on this request. This sets the response to expire in the given seconds, and any other attributes are used for cache_control (e.g., private=True, etc). """ cache_control = self.cache_control if isinstance(seconds, timedelta): seconds = timedelta_to_seconds(seconds) if not seconds: # To really expire something, you have to force a # bunch of these cache control attributes, and IE may # not pay attention to those still so we also set # Expires. cache_control.no_store = True cache_control.no_cache = True cache_control.must_revalidate = True cache_control.max_age = 0 cache_control.post_check = 0 cache_control.pre_check = 0 self.expires = datetime.utcnow() if 'last-modified' not in self.headers: self.last_modified = datetime.utcnow() self.pragma = 'no-cache' else: cache_control.max_age = seconds self.expires = datetime.utcnow() + timedelta(seconds=seconds) for name, value in kw.items(): setattr(cache_control, name, value) cache_expires = set_via_call(_cache_expires, _adapt_cache_expires) content_encoding = header_getter('Content-Encoding', rfc_section='14.11') def encode_content(self, encoding='gzip'): """ Encode the content with the given encoding (only gzip and identity are supported). """ if encoding == 'identity': self.decode_content() return if encoding != 'gzip': raise ValueError( "Unknown encoding: %r" % encoding) if self.content_encoding: if self.content_encoding == encoding: return self.decode_content() from webob.util.safegzip import GzipFile f = StringIO() gzip_f = GzipFile(filename='', mode='w', fileobj=f) gzip_f.write(self.body) gzip_f.close() new_body = f.getvalue() f.close() self.content_encoding = 'gzip' self.body = new_body def decode_content(self): content_encoding = self.content_encoding if not content_encoding or content_encoding == 'identity': return if content_encoding != 'gzip': raise ValueError( "I don't know how to decode the content %s" % content_encoding) from webob.util.safegzip import GzipFile f = StringIO(self.body) gzip_f = GzipFile(filename='', mode='r', fileobj=f) new_body = gzip_f.read() gzip_f.close() f.close() self.content_encoding = None self.body = new_body content_language = converter( header_getter('Content-Language', rfc_section='14.12'), _parse_list, _serialize_list, 'list') content_location = header_getter( 'Content-Location', rfc_section='14.14') content_md5 = header_getter( 'Content-MD5', rfc_section='14.14') content_range = converter( header_getter('Content-Range', rfc_section='14.16'), _parse_content_range, _serialize_content_range, 'ContentRange object') content_length = converter( header_getter('Content-Length', rfc_section='14.17'), _parse_int, _serialize_int, 'int') date = converter( header_getter('Date', rfc_section='14.18'), _parse_date, _serialize_date, 'HTTP date') etag = header_getter('ETag', rfc_section='14.19') def md5_etag(self, body=None, set_content_md5=False, set_conditional_response=False): """ Generate an etag for the response object using an MD5 hash of the body (the body parameter, or ``self.body`` if not given) Sets ``self.etag`` If ``set_content_md5`` is True sets ``self.content_md5`` as well If ``set_conditional_response`` is True sets ``self.conditional_response`` to True """ if body is None: body = self.body try: from hashlib import md5 except ImportError: from md5 import md5 h = md5(body) md5_digest = h.digest().encode('base64').replace('\n', '').strip('=') self.etag = md5_digest if set_content_md5: self.content_md5 = md5_digest if set_conditional_response: self.conditional_response = True expires = converter( header_getter('Expires', rfc_section='14.21'), _parse_date, _serialize_date, 'HTTP date') last_modified = converter( header_getter('Last-Modified', rfc_section='14.29'), _parse_date, _serialize_date, 'HTTP date') pragma = header_getter('Pragma', rfc_section='14.32') retry_after = converter( header_getter('Retry-After', rfc_section='14.37'), _parse_date_delta, _serialize_date_delta, 'HTTP date or delta seconds') server = header_getter('Server', rfc_section='14.38') ## FIXME: I realize response.vary += 'something' won't work. It should. ## Maybe for all listy headers. vary = converter( header_getter('Vary', rfc_section='14.44'), _parse_list, _serialize_list, 'list') ## FIXME: 14.47 WWW-Authenticate ## http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.47 def _request__get(self): """ Return the request associated with this response if any. """ if self._request is None and self._environ is not None: self._request = self.RequestClass(self._environ) return self._request def _request__set(self, value): if value is None: del self.request return if isinstance(value, dict): self._environ = value self._request = None else: self._request = value self._environ = value.environ def _request__del(self): self._request = self._environ = None request = property(_request__get, _request__set, _request__del, doc=_request__get.__doc__) def _environ__get(self): """ Get/set the request environ associated with this response, if any. """ return self._environ def _environ__set(self, value): if value is None: del self.environ self._environ = value self._request = None def _environ__del(self): self._request = self._environ = None environ = property(_environ__get, _environ__set, _environ__del, doc=_environ__get.__doc__) def __call__(self, environ, start_response): """ WSGI application interface """ if self.conditional_response: return self.conditional_response_app(environ, start_response) start_response(self.status, self.headerlist) if environ['REQUEST_METHOD'] == 'HEAD': # Special case here... return EmptyResponse(self.app_iter) return self.app_iter _safe_methods = ('GET', 'HEAD') def conditional_response_app(self, environ, start_response): """ Like the normal __call__ interface, but checks conditional headers: * If-Modified-Since (304 Not Modified; only on GET, HEAD) * If-None-Match (304 Not Modified; only on GET, HEAD) * Range (406 Partial Content; only on GET, HEAD) """ req = self.RequestClass(environ) status304 = False if req.method in self._safe_methods: if req.if_modified_since and self.last_modified and self.last_modified <= req.if_modified_since: status304 = True if req.if_none_match and self.etag: ## FIXME: should a weak match be okay? if self.etag in req.if_none_match: status304 = True else: # Even if If-Modified-Since matched, if ETag doesn't then reject it status304 = False if status304: start_response('304 Not Modified', self.headerlist) return EmptyResponse(self.app_iter) if req.method == 'HEAD': start_response(self.status, self.headerlist) return EmptyResponse(self.app_iter) if (req.range and req.if_range.match_response(self) and self.content_range is None and req.method == 'GET' and self.status_int == 200): content_range = req.range.content_range(self.content_length) if content_range is not None: app_iter = self.app_iter_range(content_range.start, content_range.stop) if app_iter is not None: headers = list(self.headerlist) headers.append(('Content-Range', str(content_range))) start_response('206 Partial Content', headers) return app_iter start_response(self.status, self.headerlist) return self.app_iter def app_iter_range(self, start, stop): """ Return a new app_iter built from the response app_iter, that serves up only the given ``start:stop`` range. """ if self._app_iter is None: return [self.body[start:stop]] app_iter = self.app_iter if hasattr(app_iter, 'app_iter_range'): return app_iter.app_iter_range(start, stop) return AppIterRange(app_iter, start, stop) Request.ResponseClass = Response Response.RequestClass = Request def _cgi_FieldStorage__repr__patch(self): """ monkey patch for FieldStorage.__repr__ Unbelievely, the default __repr__ on FieldStorage reads the entire file content instead of being sane about it. This is a simple replacement that doesn't do that """ if self.file: return "FieldStorage(%r, %r)" % ( self.name, self.filename) return "FieldStorage(%r, %r, %r)" % ( self.name, self.filename, self.value) cgi.FieldStorage.__repr__ = _cgi_FieldStorage__repr__patch class FakeCGIBody(object): def __init__(self, vars): self.vars = vars self._body = None self.position = 0 def read(self, size=-1): body = self._get_body() if size == -1: v = body[self.position:] self.position = len(body) return v else: v = body[self.position:self.position+size] self.position = min(len(body), self.position+size) return v def _get_body(self): if self._body is None: self._body = urllib.urlencode(self.vars.items()) return self._body def readline(self, size=None): # We ignore size, but allow it to be hinted rest = self._get_body()[self.position:] next = rest.find('\r\n') if next == -1: return self.read() self.position += next+2 return rest[:next+2] def readlines(self, hint=None): # Again, allow hint but ignore body = self._get_body() rest = body[self.position:] self.position = len(body) result = [] while 1: next = rest.find('\r\n') if next == -1: result.append(rest) break result.append(rest[:next+2]) rest = rest[next+2:] return result def __iter__(self): return iter(self.readlines()) def __repr__(self): inner = repr(self.vars) if len(inner) > 20: inner = inner[:15] + '...' + inner[-5:] return '<%s at 0x%x viewing %s>' % ( self.__class__.__name__, abs(id(self)), inner) #@classmethod def update_environ(cls, environ, vars): obj = cls(vars) environ['CONTENT_LENGTH'] = '-1' environ['wsgi.input'] = obj update_environ = classmethod(update_environ) class ResponseBodyFile(object): def __init__(self, response): self.response = response def __repr__(self): return '' % ( self.response) def close(self): raise NotImplementedError( "Response bodies cannot be closed") def flush(self): pass def write(self, s): if isinstance(s, unicode): if self.response.charset is not None: s = s.encode(self.response.charset) else: raise TypeError( "You can only write unicode to Response.body_file " "if charset has been set") if not isinstance(s, str): raise TypeError( "You can only write str to a Response.body_file, not %s" % type(s)) if not isinstance(self.response._app_iter, list): body = self.response.body if body: self.response.app_iter = [body] else: self.response.app_iter = [] self.response.app_iter.append(s) def writelines(self, seq): for item in seq: self.write(item) closed = False def encoding(self): """ The encoding of the file (inherited from response.charset) """ return self.response.charset encoding = property(encoding, doc=encoding.__doc__) mode = 'wb' class AppIterRange(object): """ Wraps an app_iter, returning just a range of bytes """ def __init__(self, app_iter, start, stop): assert start >= 0, "Bad start: %r" % start assert stop is None or (stop >= 0 and stop >= start), ( "Bad stop: %r" % stop) self.app_iter = app_iter self.app_iterator = iter(app_iter) self.start = start if stop is None: self.length = -1 else: self.length = stop - start if start: self._served = None else: self._served = 0 if hasattr(app_iter, 'close'): self.close = app_iter.close def __iter__(self): return self def next(self): if self._served is None: # Haven't served anything; need to skip some leading bytes skipped = 0 start = self.start while 1: chunk = self.app_iterator.next() skipped += len(chunk) extra = skipped - start if extra == 0: self._served = 0 break elif extra > 0: self._served = extra return chunk[-extra:] length = self.length if length is None: # Spent raise StopIteration chunk = self.app_iterator.next() if length == -1: return chunk if self._served + len(chunk) > length: extra = self._served + len(chunk) - length self.length = None return chunk[:-extra] self._served += len(chunk) return chunk class EmptyResponse(object): """An empty WSGI response. An iterator that immediately stops. Optionally provides a close method to close an underlying app_iter it replaces. """ def __init__(self, app_iter=None): if app_iter and hasattr(app_iter, 'close'): self.close = app_iter.close def __iter__(self): return self def __len__(self): return 0 def next(self): raise StopIteration()