docker setup

This commit is contained in:
AdrienLSH
2023-11-23 16:43:30 +01:00
parent fd19180e1d
commit f29003c66a
5410 changed files with 869440 additions and 0 deletions

View File

@ -0,0 +1,52 @@
from django.http.cookie import SimpleCookie, parse_cookie
from django.http.request import (
HttpHeaders,
HttpRequest,
QueryDict,
RawPostDataException,
UnreadablePostError,
)
from django.http.response import (
BadHeaderError,
FileResponse,
Http404,
HttpResponse,
HttpResponseBadRequest,
HttpResponseBase,
HttpResponseForbidden,
HttpResponseGone,
HttpResponseNotAllowed,
HttpResponseNotFound,
HttpResponseNotModified,
HttpResponsePermanentRedirect,
HttpResponseRedirect,
HttpResponseServerError,
JsonResponse,
StreamingHttpResponse,
)
__all__ = [
"SimpleCookie",
"parse_cookie",
"HttpHeaders",
"HttpRequest",
"QueryDict",
"RawPostDataException",
"UnreadablePostError",
"HttpResponse",
"HttpResponseBase",
"StreamingHttpResponse",
"HttpResponseRedirect",
"HttpResponsePermanentRedirect",
"HttpResponseNotModified",
"HttpResponseBadRequest",
"HttpResponseForbidden",
"HttpResponseNotFound",
"HttpResponseNotAllowed",
"HttpResponseGone",
"HttpResponseServerError",
"Http404",
"BadHeaderError",
"JsonResponse",
"FileResponse",
]

View File

@ -0,0 +1,23 @@
from http import cookies
# For backwards compatibility in Django 2.1.
SimpleCookie = cookies.SimpleCookie
def parse_cookie(cookie):
"""
Return a dictionary parsed from a `Cookie:` header string.
"""
cookiedict = {}
for chunk in cookie.split(";"):
if "=" in chunk:
key, val = chunk.split("=", 1)
else:
# Assume an empty name per
# https://bugzilla.mozilla.org/show_bug.cgi?id=169091
key, val = "", chunk
key, val = key.strip(), val.strip()
if key or val:
# unquote using Python's algorithm.
cookiedict[key] = cookies._unquote(val)
return cookiedict

View File

@ -0,0 +1,743 @@
"""
Multi-part parsing for file uploads.
Exposes one class, ``MultiPartParser``, which feeds chunks of uploaded data to
file upload handlers for processing.
"""
import base64
import binascii
import collections
import html
from django.conf import settings
from django.core.exceptions import (
RequestDataTooBig,
SuspiciousMultipartForm,
TooManyFieldsSent,
TooManyFilesSent,
)
from django.core.files.uploadhandler import SkipFile, StopFutureHandlers, StopUpload
from django.utils.datastructures import MultiValueDict
from django.utils.encoding import force_str
from django.utils.http import parse_header_parameters
from django.utils.regex_helper import _lazy_re_compile
__all__ = ("MultiPartParser", "MultiPartParserError", "InputStreamExhausted")
class MultiPartParserError(Exception):
pass
class InputStreamExhausted(Exception):
"""
No more reads are allowed from this device.
"""
pass
RAW = "raw"
FILE = "file"
FIELD = "field"
FIELD_TYPES = frozenset([FIELD, RAW])
class MultiPartParser:
"""
An RFC 7578 multipart/form-data parser.
``MultiValueDict.parse()`` reads the input stream in ``chunk_size`` chunks
and returns a tuple of ``(MultiValueDict(POST), MultiValueDict(FILES))``.
"""
boundary_re = _lazy_re_compile(r"[ -~]{0,200}[!-~]")
def __init__(self, META, input_data, upload_handlers, encoding=None):
"""
Initialize the MultiPartParser object.
:META:
The standard ``META`` dictionary in Django request objects.
:input_data:
The raw post data, as a file-like object.
:upload_handlers:
A list of UploadHandler instances that perform operations on the
uploaded data.
:encoding:
The encoding with which to treat the incoming data.
"""
# Content-Type should contain multipart and the boundary information.
content_type = META.get("CONTENT_TYPE", "")
if not content_type.startswith("multipart/"):
raise MultiPartParserError("Invalid Content-Type: %s" % content_type)
try:
content_type.encode("ascii")
except UnicodeEncodeError:
raise MultiPartParserError(
"Invalid non-ASCII Content-Type in multipart: %s"
% force_str(content_type)
)
# Parse the header to get the boundary to split the parts.
_, opts = parse_header_parameters(content_type)
boundary = opts.get("boundary")
if not boundary or not self.boundary_re.fullmatch(boundary):
raise MultiPartParserError(
"Invalid boundary in multipart: %s" % force_str(boundary)
)
# Content-Length should contain the length of the body we are about
# to receive.
try:
content_length = int(META.get("CONTENT_LENGTH", 0))
except (ValueError, TypeError):
content_length = 0
if content_length < 0:
# This means we shouldn't continue...raise an error.
raise MultiPartParserError("Invalid content length: %r" % content_length)
self._boundary = boundary.encode("ascii")
self._input_data = input_data
# For compatibility with low-level network APIs (with 32-bit integers),
# the chunk size should be < 2^31, but still divisible by 4.
possible_sizes = [x.chunk_size for x in upload_handlers if x.chunk_size]
self._chunk_size = min([2**31 - 4] + possible_sizes)
self._meta = META
self._encoding = encoding or settings.DEFAULT_CHARSET
self._content_length = content_length
self._upload_handlers = upload_handlers
def parse(self):
# Call the actual parse routine and close all open files in case of
# errors. This is needed because if exceptions are thrown the
# MultiPartParser will not be garbage collected immediately and
# resources would be kept alive. This is only needed for errors because
# the Request object closes all uploaded files at the end of the
# request.
try:
return self._parse()
except Exception:
if hasattr(self, "_files"):
for _, files in self._files.lists():
for fileobj in files:
fileobj.close()
raise
def _parse(self):
"""
Parse the POST data and break it into a FILES MultiValueDict and a POST
MultiValueDict.
Return a tuple containing the POST and FILES dictionary, respectively.
"""
from django.http import QueryDict
encoding = self._encoding
handlers = self._upload_handlers
# HTTP spec says that Content-Length >= 0 is valid
# handling content-length == 0 before continuing
if self._content_length == 0:
return QueryDict(encoding=self._encoding), MultiValueDict()
# See if any of the handlers take care of the parsing.
# This allows overriding everything if need be.
for handler in handlers:
result = handler.handle_raw_input(
self._input_data,
self._meta,
self._content_length,
self._boundary,
encoding,
)
# Check to see if it was handled
if result is not None:
return result[0], result[1]
# Create the data structures to be used later.
self._post = QueryDict(mutable=True)
self._files = MultiValueDict()
# Instantiate the parser and stream:
stream = LazyStream(ChunkIter(self._input_data, self._chunk_size))
# Whether or not to signal a file-completion at the beginning of the loop.
old_field_name = None
counters = [0] * len(handlers)
# Number of bytes that have been read.
num_bytes_read = 0
# To count the number of keys in the request.
num_post_keys = 0
# To count the number of files in the request.
num_files = 0
# To limit the amount of data read from the request.
read_size = None
# Whether a file upload is finished.
uploaded_file = True
try:
for item_type, meta_data, field_stream in Parser(stream, self._boundary):
if old_field_name:
# We run this at the beginning of the next loop
# since we cannot be sure a file is complete until
# we hit the next boundary/part of the multipart content.
self.handle_file_complete(old_field_name, counters)
old_field_name = None
uploaded_file = True
if (
item_type in FIELD_TYPES
and settings.DATA_UPLOAD_MAX_NUMBER_FIELDS is not None
):
# Avoid storing more than DATA_UPLOAD_MAX_NUMBER_FIELDS.
num_post_keys += 1
# 2 accounts for empty raw fields before and after the
# last boundary.
if settings.DATA_UPLOAD_MAX_NUMBER_FIELDS + 2 < num_post_keys:
raise TooManyFieldsSent(
"The number of GET/POST parameters exceeded "
"settings.DATA_UPLOAD_MAX_NUMBER_FIELDS."
)
try:
disposition = meta_data["content-disposition"][1]
field_name = disposition["name"].strip()
except (KeyError, IndexError, AttributeError):
continue
transfer_encoding = meta_data.get("content-transfer-encoding")
if transfer_encoding is not None:
transfer_encoding = transfer_encoding[0].strip()
field_name = force_str(field_name, encoding, errors="replace")
if item_type == FIELD:
# Avoid reading more than DATA_UPLOAD_MAX_MEMORY_SIZE.
if settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None:
read_size = (
settings.DATA_UPLOAD_MAX_MEMORY_SIZE - num_bytes_read
)
# This is a post field, we can just set it in the post
if transfer_encoding == "base64":
raw_data = field_stream.read(size=read_size)
num_bytes_read += len(raw_data)
try:
data = base64.b64decode(raw_data)
except binascii.Error:
data = raw_data
else:
data = field_stream.read(size=read_size)
num_bytes_read += len(data)
# Add two here to make the check consistent with the
# x-www-form-urlencoded check that includes '&='.
num_bytes_read += len(field_name) + 2
if (
settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None
and num_bytes_read > settings.DATA_UPLOAD_MAX_MEMORY_SIZE
):
raise RequestDataTooBig(
"Request body exceeded "
"settings.DATA_UPLOAD_MAX_MEMORY_SIZE."
)
self._post.appendlist(
field_name, force_str(data, encoding, errors="replace")
)
elif item_type == FILE:
# Avoid storing more than DATA_UPLOAD_MAX_NUMBER_FILES.
num_files += 1
if (
settings.DATA_UPLOAD_MAX_NUMBER_FILES is not None
and num_files > settings.DATA_UPLOAD_MAX_NUMBER_FILES
):
raise TooManyFilesSent(
"The number of files exceeded "
"settings.DATA_UPLOAD_MAX_NUMBER_FILES."
)
# This is a file, use the handler...
file_name = disposition.get("filename")
if file_name:
file_name = force_str(file_name, encoding, errors="replace")
file_name = self.sanitize_file_name(file_name)
if not file_name:
continue
content_type, content_type_extra = meta_data.get(
"content-type", ("", {})
)
content_type = content_type.strip()
charset = content_type_extra.get("charset")
try:
content_length = int(meta_data.get("content-length")[0])
except (IndexError, TypeError, ValueError):
content_length = None
counters = [0] * len(handlers)
uploaded_file = False
try:
for handler in handlers:
try:
handler.new_file(
field_name,
file_name,
content_type,
content_length,
charset,
content_type_extra,
)
except StopFutureHandlers:
break
for chunk in field_stream:
if transfer_encoding == "base64":
# We only special-case base64 transfer encoding
# We should always decode base64 chunks by
# multiple of 4, ignoring whitespace.
stripped_chunk = b"".join(chunk.split())
remaining = len(stripped_chunk) % 4
while remaining != 0:
over_chunk = field_stream.read(4 - remaining)
if not over_chunk:
break
stripped_chunk += b"".join(over_chunk.split())
remaining = len(stripped_chunk) % 4
try:
chunk = base64.b64decode(stripped_chunk)
except Exception as exc:
# Since this is only a chunk, any error is
# an unfixable error.
raise MultiPartParserError(
"Could not decode base64 data."
) from exc
for i, handler in enumerate(handlers):
chunk_length = len(chunk)
chunk = handler.receive_data_chunk(chunk, counters[i])
counters[i] += chunk_length
if chunk is None:
# Don't continue if the chunk received by
# the handler is None.
break
except SkipFile:
self._close_files()
# Just use up the rest of this file...
exhaust(field_stream)
else:
# Handle file upload completions on next iteration.
old_field_name = field_name
else:
# If this is neither a FIELD nor a FILE, exhaust the field
# stream. Note: There could be an error here at some point,
# but there will be at least two RAW types (before and
# after the other boundaries). This branch is usually not
# reached at all, because a missing content-disposition
# header will skip the whole boundary.
exhaust(field_stream)
except StopUpload as e:
self._close_files()
if not e.connection_reset:
exhaust(self._input_data)
else:
if not uploaded_file:
for handler in handlers:
handler.upload_interrupted()
# Make sure that the request data is all fed
exhaust(self._input_data)
# Signal that the upload has completed.
# any() shortcircuits if a handler's upload_complete() returns a value.
any(handler.upload_complete() for handler in handlers)
self._post._mutable = False
return self._post, self._files
def handle_file_complete(self, old_field_name, counters):
"""
Handle all the signaling that takes place when a file is complete.
"""
for i, handler in enumerate(self._upload_handlers):
file_obj = handler.file_complete(counters[i])
if file_obj:
# If it returns a file object, then set the files dict.
self._files.appendlist(
force_str(old_field_name, self._encoding, errors="replace"),
file_obj,
)
break
def sanitize_file_name(self, file_name):
"""
Sanitize the filename of an upload.
Remove all possible path separators, even though that might remove more
than actually required by the target system. Filenames that could
potentially cause problems (current/parent dir) are also discarded.
It should be noted that this function could still return a "filepath"
like "C:some_file.txt" which is handled later on by the storage layer.
So while this function does sanitize filenames to some extent, the
resulting filename should still be considered as untrusted user input.
"""
file_name = html.unescape(file_name)
file_name = file_name.rsplit("/")[-1]
file_name = file_name.rsplit("\\")[-1]
# Remove non-printable characters.
file_name = "".join([char for char in file_name if char.isprintable()])
if file_name in {"", ".", ".."}:
return None
return file_name
IE_sanitize = sanitize_file_name
def _close_files(self):
# Free up all file handles.
# FIXME: this currently assumes that upload handlers store the file as 'file'
# We should document that...
# (Maybe add handler.free_file to complement new_file)
for handler in self._upload_handlers:
if hasattr(handler, "file"):
handler.file.close()
class LazyStream:
"""
The LazyStream wrapper allows one to get and "unget" bytes from a stream.
Given a producer object (an iterator that yields bytestrings), the
LazyStream object will support iteration, reading, and keeping a "look-back"
variable in case you need to "unget" some bytes.
"""
def __init__(self, producer, length=None):
"""
Every LazyStream must have a producer when instantiated.
A producer is an iterable that returns a string each time it
is called.
"""
self._producer = producer
self._empty = False
self._leftover = b""
self.length = length
self.position = 0
self._remaining = length
self._unget_history = []
def tell(self):
return self.position
def read(self, size=None):
def parts():
remaining = self._remaining if size is None else size
# do the whole thing in one shot if no limit was provided.
if remaining is None:
yield b"".join(self)
return
# otherwise do some bookkeeping to return exactly enough
# of the stream and stashing any extra content we get from
# the producer
while remaining != 0:
assert remaining > 0, "remaining bytes to read should never go negative"
try:
chunk = next(self)
except StopIteration:
return
else:
emitting = chunk[:remaining]
self.unget(chunk[remaining:])
remaining -= len(emitting)
yield emitting
return b"".join(parts())
def __next__(self):
"""
Used when the exact number of bytes to read is unimportant.
Return whatever chunk is conveniently returned from the iterator.
Useful to avoid unnecessary bookkeeping if performance is an issue.
"""
if self._leftover:
output = self._leftover
self._leftover = b""
else:
output = next(self._producer)
self._unget_history = []
self.position += len(output)
return output
def close(self):
"""
Used to invalidate/disable this lazy stream.
Replace the producer with an empty list. Any leftover bytes that have
already been read will still be reported upon read() and/or next().
"""
self._producer = []
def __iter__(self):
return self
def unget(self, bytes):
"""
Place bytes back onto the front of the lazy stream.
Future calls to read() will return those bytes first. The
stream position and thus tell() will be rewound.
"""
if not bytes:
return
self._update_unget_history(len(bytes))
self.position -= len(bytes)
self._leftover = bytes + self._leftover
def _update_unget_history(self, num_bytes):
"""
Update the unget history as a sanity check to see if we've pushed
back the same number of bytes in one chunk. If we keep ungetting the
same number of bytes many times (here, 50), we're mostly likely in an
infinite loop of some sort. This is usually caused by a
maliciously-malformed MIME request.
"""
self._unget_history = [num_bytes] + self._unget_history[:49]
number_equal = len(
[
current_number
for current_number in self._unget_history
if current_number == num_bytes
]
)
if number_equal > 40:
raise SuspiciousMultipartForm(
"The multipart parser got stuck, which shouldn't happen with"
" normal uploaded files. Check for malicious upload activity;"
" if there is none, report this to the Django developers."
)
class ChunkIter:
"""
An iterable that will yield chunks of data. Given a file-like object as the
constructor, yield chunks of read operations from that object.
"""
def __init__(self, flo, chunk_size=64 * 1024):
self.flo = flo
self.chunk_size = chunk_size
def __next__(self):
try:
data = self.flo.read(self.chunk_size)
except InputStreamExhausted:
raise StopIteration()
if data:
return data
else:
raise StopIteration()
def __iter__(self):
return self
class InterBoundaryIter:
"""
A Producer that will iterate over boundaries.
"""
def __init__(self, stream, boundary):
self._stream = stream
self._boundary = boundary
def __iter__(self):
return self
def __next__(self):
try:
return LazyStream(BoundaryIter(self._stream, self._boundary))
except InputStreamExhausted:
raise StopIteration()
class BoundaryIter:
"""
A Producer that is sensitive to boundaries.
Will happily yield bytes until a boundary is found. Will yield the bytes
before the boundary, throw away the boundary bytes themselves, and push the
post-boundary bytes back on the stream.
The future calls to next() after locating the boundary will raise a
StopIteration exception.
"""
def __init__(self, stream, boundary):
self._stream = stream
self._boundary = boundary
self._done = False
# rollback an additional six bytes because the format is like
# this: CRLF<boundary>[--CRLF]
self._rollback = len(boundary) + 6
# Try to use mx fast string search if available. Otherwise
# use Python find. Wrap the latter for consistency.
unused_char = self._stream.read(1)
if not unused_char:
raise InputStreamExhausted()
self._stream.unget(unused_char)
def __iter__(self):
return self
def __next__(self):
if self._done:
raise StopIteration()
stream = self._stream
rollback = self._rollback
bytes_read = 0
chunks = []
for bytes in stream:
bytes_read += len(bytes)
chunks.append(bytes)
if bytes_read > rollback:
break
if not bytes:
break
else:
self._done = True
if not chunks:
raise StopIteration()
chunk = b"".join(chunks)
boundary = self._find_boundary(chunk)
if boundary:
end, next = boundary
stream.unget(chunk[next:])
self._done = True
return chunk[:end]
else:
# make sure we don't treat a partial boundary (and
# its separators) as data
if not chunk[:-rollback]: # and len(chunk) >= (len(self._boundary) + 6):
# There's nothing left, we should just return and mark as done.
self._done = True
return chunk
else:
stream.unget(chunk[-rollback:])
return chunk[:-rollback]
def _find_boundary(self, data):
"""
Find a multipart boundary in data.
Should no boundary exist in the data, return None. Otherwise, return
a tuple containing the indices of the following:
* the end of current encapsulation
* the start of the next encapsulation
"""
index = data.find(self._boundary)
if index < 0:
return None
else:
end = index
next = index + len(self._boundary)
# backup over CRLF
last = max(0, end - 1)
if data[last : last + 1] == b"\n":
end -= 1
last = max(0, end - 1)
if data[last : last + 1] == b"\r":
end -= 1
return end, next
def exhaust(stream_or_iterable):
"""Exhaust an iterator or stream."""
try:
iterator = iter(stream_or_iterable)
except TypeError:
iterator = ChunkIter(stream_or_iterable, 16384)
collections.deque(iterator, maxlen=0) # consume iterator quickly.
def parse_boundary_stream(stream, max_header_size):
"""
Parse one and exactly one stream that encapsulates a boundary.
"""
# Stream at beginning of header, look for end of header
# and parse it if found. The header must fit within one
# chunk.
chunk = stream.read(max_header_size)
# 'find' returns the top of these four bytes, so we'll
# need to munch them later to prevent them from polluting
# the payload.
header_end = chunk.find(b"\r\n\r\n")
if header_end == -1:
# we find no header, so we just mark this fact and pass on
# the stream verbatim
stream.unget(chunk)
return (RAW, {}, stream)
header = chunk[:header_end]
# here we place any excess chunk back onto the stream, as
# well as throwing away the CRLFCRLF bytes from above.
stream.unget(chunk[header_end + 4 :])
TYPE = RAW
outdict = {}
# Eliminate blank lines
for line in header.split(b"\r\n"):
# This terminology ("main value" and "dictionary of
# parameters") is from the Python docs.
try:
main_value_pair, params = parse_header_parameters(line.decode())
name, value = main_value_pair.split(":", 1)
params = {k: v.encode() for k, v in params.items()}
except ValueError: # Invalid header.
continue
if name == "content-disposition":
TYPE = FIELD
if params.get("filename"):
TYPE = FILE
outdict[name] = value, params
if TYPE == RAW:
stream.unget(chunk)
return (TYPE, outdict, stream)
class Parser:
def __init__(self, stream, boundary):
self._stream = stream
self._separator = b"--" + boundary
def __iter__(self):
boundarystream = InterBoundaryIter(self._stream, self._separator)
for sub_stream in boundarystream:
# Iterate over each part
yield parse_boundary_stream(sub_stream, 1024)

View File

@ -0,0 +1,739 @@
import codecs
import copy
from io import BytesIO
from itertools import chain
from urllib.parse import parse_qsl, quote, urlencode, urljoin, urlsplit
from django.conf import settings
from django.core import signing
from django.core.exceptions import (
DisallowedHost,
ImproperlyConfigured,
RequestDataTooBig,
TooManyFieldsSent,
)
from django.core.files import uploadhandler
from django.http.multipartparser import (
MultiPartParser,
MultiPartParserError,
TooManyFilesSent,
)
from django.utils.datastructures import (
CaseInsensitiveMapping,
ImmutableList,
MultiValueDict,
)
from django.utils.encoding import escape_uri_path, iri_to_uri
from django.utils.functional import cached_property
from django.utils.http import is_same_domain, parse_header_parameters
from django.utils.regex_helper import _lazy_re_compile
RAISE_ERROR = object()
host_validation_re = _lazy_re_compile(
r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9\.:]+\])(:[0-9]+)?$"
)
class UnreadablePostError(OSError):
pass
class RawPostDataException(Exception):
"""
You cannot access raw_post_data from a request that has
multipart/* POST data if it has been accessed via POST,
FILES, etc..
"""
pass
class HttpRequest:
"""A basic HTTP request."""
# The encoding used in GET/POST dicts. None means use default setting.
_encoding = None
_upload_handlers = []
def __init__(self):
# WARNING: The `WSGIRequest` subclass doesn't call `super`.
# Any variable assignment made here should also happen in
# `WSGIRequest.__init__()`.
self.GET = QueryDict(mutable=True)
self.POST = QueryDict(mutable=True)
self.COOKIES = {}
self.META = {}
self.FILES = MultiValueDict()
self.path = ""
self.path_info = ""
self.method = None
self.resolver_match = None
self.content_type = None
self.content_params = None
def __repr__(self):
if self.method is None or not self.get_full_path():
return "<%s>" % self.__class__.__name__
return "<%s: %s %r>" % (
self.__class__.__name__,
self.method,
self.get_full_path(),
)
@cached_property
def headers(self):
return HttpHeaders(self.META)
@cached_property
def accepted_types(self):
"""Return a list of MediaType instances."""
return parse_accept_header(self.headers.get("Accept", "*/*"))
def accepts(self, media_type):
return any(
accepted_type.match(media_type) for accepted_type in self.accepted_types
)
def _set_content_type_params(self, meta):
"""Set content_type, content_params, and encoding."""
self.content_type, self.content_params = parse_header_parameters(
meta.get("CONTENT_TYPE", "")
)
if "charset" in self.content_params:
try:
codecs.lookup(self.content_params["charset"])
except LookupError:
pass
else:
self.encoding = self.content_params["charset"]
def _get_raw_host(self):
"""
Return the HTTP host using the environment or request headers. Skip
allowed hosts protection, so may return an insecure host.
"""
# We try three options, in order of decreasing preference.
if settings.USE_X_FORWARDED_HOST and ("HTTP_X_FORWARDED_HOST" in self.META):
host = self.META["HTTP_X_FORWARDED_HOST"]
elif "HTTP_HOST" in self.META:
host = self.META["HTTP_HOST"]
else:
# Reconstruct the host using the algorithm from PEP 333.
host = self.META["SERVER_NAME"]
server_port = self.get_port()
if server_port != ("443" if self.is_secure() else "80"):
host = "%s:%s" % (host, server_port)
return host
def get_host(self):
"""Return the HTTP host using the environment or request headers."""
host = self._get_raw_host()
# Allow variants of localhost if ALLOWED_HOSTS is empty and DEBUG=True.
allowed_hosts = settings.ALLOWED_HOSTS
if settings.DEBUG and not allowed_hosts:
allowed_hosts = [".localhost", "127.0.0.1", "[::1]"]
domain, port = split_domain_port(host)
if domain and validate_host(domain, allowed_hosts):
return host
else:
msg = "Invalid HTTP_HOST header: %r." % host
if domain:
msg += " You may need to add %r to ALLOWED_HOSTS." % domain
else:
msg += (
" The domain name provided is not valid according to RFC 1034/1035."
)
raise DisallowedHost(msg)
def get_port(self):
"""Return the port number for the request as a string."""
if settings.USE_X_FORWARDED_PORT and "HTTP_X_FORWARDED_PORT" in self.META:
port = self.META["HTTP_X_FORWARDED_PORT"]
else:
port = self.META["SERVER_PORT"]
return str(port)
def get_full_path(self, force_append_slash=False):
return self._get_full_path(self.path, force_append_slash)
def get_full_path_info(self, force_append_slash=False):
return self._get_full_path(self.path_info, force_append_slash)
def _get_full_path(self, path, force_append_slash):
# RFC 3986 requires query string arguments to be in the ASCII range.
# Rather than crash if this doesn't happen, we encode defensively.
return "%s%s%s" % (
escape_uri_path(path),
"/" if force_append_slash and not path.endswith("/") else "",
("?" + iri_to_uri(self.META.get("QUERY_STRING", "")))
if self.META.get("QUERY_STRING", "")
else "",
)
def get_signed_cookie(self, key, default=RAISE_ERROR, salt="", max_age=None):
"""
Attempt to return a signed cookie. If the signature fails or the
cookie has expired, raise an exception, unless the `default` argument
is provided, in which case return that value.
"""
try:
cookie_value = self.COOKIES[key]
except KeyError:
if default is not RAISE_ERROR:
return default
else:
raise
try:
value = signing.get_cookie_signer(salt=key + salt).unsign(
cookie_value, max_age=max_age
)
except signing.BadSignature:
if default is not RAISE_ERROR:
return default
else:
raise
return value
def build_absolute_uri(self, location=None):
"""
Build an absolute URI from the location and the variables available in
this request. If no ``location`` is specified, build the absolute URI
using request.get_full_path(). If the location is absolute, convert it
to an RFC 3987 compliant URI and return it. If location is relative or
is scheme-relative (i.e., ``//example.com/``), urljoin() it to a base
URL constructed from the request variables.
"""
if location is None:
# Make it an absolute url (but schemeless and domainless) for the
# edge case that the path starts with '//'.
location = "//%s" % self.get_full_path()
else:
# Coerce lazy locations.
location = str(location)
bits = urlsplit(location)
if not (bits.scheme and bits.netloc):
# Handle the simple, most common case. If the location is absolute
# and a scheme or host (netloc) isn't provided, skip an expensive
# urljoin() as long as no path segments are '.' or '..'.
if (
bits.path.startswith("/")
and not bits.scheme
and not bits.netloc
and "/./" not in bits.path
and "/../" not in bits.path
):
# If location starts with '//' but has no netloc, reuse the
# schema and netloc from the current request. Strip the double
# slashes and continue as if it wasn't specified.
if location.startswith("//"):
location = location[2:]
location = self._current_scheme_host + location
else:
# Join the constructed URL with the provided location, which
# allows the provided location to apply query strings to the
# base path.
location = urljoin(self._current_scheme_host + self.path, location)
return iri_to_uri(location)
@cached_property
def _current_scheme_host(self):
return "{}://{}".format(self.scheme, self.get_host())
def _get_scheme(self):
"""
Hook for subclasses like WSGIRequest to implement. Return 'http' by
default.
"""
return "http"
@property
def scheme(self):
if settings.SECURE_PROXY_SSL_HEADER:
try:
header, secure_value = settings.SECURE_PROXY_SSL_HEADER
except ValueError:
raise ImproperlyConfigured(
"The SECURE_PROXY_SSL_HEADER setting must be a tuple containing "
"two values."
)
header_value = self.META.get(header)
if header_value is not None:
header_value, *_ = header_value.split(",", 1)
return "https" if header_value.strip() == secure_value else "http"
return self._get_scheme()
def is_secure(self):
return self.scheme == "https"
@property
def encoding(self):
return self._encoding
@encoding.setter
def encoding(self, val):
"""
Set the encoding used for GET/POST accesses. If the GET or POST
dictionary has already been created, remove and recreate it on the
next access (so that it is decoded correctly).
"""
self._encoding = val
if hasattr(self, "GET"):
del self.GET
if hasattr(self, "_post"):
del self._post
def _initialize_handlers(self):
self._upload_handlers = [
uploadhandler.load_handler(handler, self)
for handler in settings.FILE_UPLOAD_HANDLERS
]
@property
def upload_handlers(self):
if not self._upload_handlers:
# If there are no upload handlers defined, initialize them from settings.
self._initialize_handlers()
return self._upload_handlers
@upload_handlers.setter
def upload_handlers(self, upload_handlers):
if hasattr(self, "_files"):
raise AttributeError(
"You cannot set the upload handlers after the upload has been "
"processed."
)
self._upload_handlers = upload_handlers
def parse_file_upload(self, META, post_data):
"""Return a tuple of (POST QueryDict, FILES MultiValueDict)."""
self.upload_handlers = ImmutableList(
self.upload_handlers,
warning=(
"You cannot alter upload handlers after the upload has been "
"processed."
),
)
parser = MultiPartParser(META, post_data, self.upload_handlers, self.encoding)
return parser.parse()
@property
def body(self):
if not hasattr(self, "_body"):
if self._read_started:
raise RawPostDataException(
"You cannot access body after reading from request's data stream"
)
# Limit the maximum request data size that will be handled in-memory.
if (
settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None
and int(self.META.get("CONTENT_LENGTH") or 0)
> settings.DATA_UPLOAD_MAX_MEMORY_SIZE
):
raise RequestDataTooBig(
"Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE."
)
try:
self._body = self.read()
except OSError as e:
raise UnreadablePostError(*e.args) from e
finally:
self._stream.close()
self._stream = BytesIO(self._body)
return self._body
def _mark_post_parse_error(self):
self._post = QueryDict()
self._files = MultiValueDict()
def _load_post_and_files(self):
"""Populate self._post and self._files if the content-type is a form type"""
if self.method != "POST":
self._post, self._files = (
QueryDict(encoding=self._encoding),
MultiValueDict(),
)
return
if self._read_started and not hasattr(self, "_body"):
self._mark_post_parse_error()
return
if self.content_type == "multipart/form-data":
if hasattr(self, "_body"):
# Use already read data
data = BytesIO(self._body)
else:
data = self
try:
self._post, self._files = self.parse_file_upload(self.META, data)
except (MultiPartParserError, TooManyFilesSent):
# An error occurred while parsing POST data. Since when
# formatting the error the request handler might access
# self.POST, set self._post and self._file to prevent
# attempts to parse POST data again.
self._mark_post_parse_error()
raise
elif self.content_type == "application/x-www-form-urlencoded":
self._post, self._files = (
QueryDict(self.body, encoding=self._encoding),
MultiValueDict(),
)
else:
self._post, self._files = (
QueryDict(encoding=self._encoding),
MultiValueDict(),
)
def close(self):
if hasattr(self, "_files"):
for f in chain.from_iterable(list_[1] for list_ in self._files.lists()):
f.close()
# File-like and iterator interface.
#
# Expects self._stream to be set to an appropriate source of bytes by
# a corresponding request subclass (e.g. WSGIRequest).
# Also when request data has already been read by request.POST or
# request.body, self._stream points to a BytesIO instance
# containing that data.
def read(self, *args, **kwargs):
self._read_started = True
try:
return self._stream.read(*args, **kwargs)
except OSError as e:
raise UnreadablePostError(*e.args) from e
def readline(self, *args, **kwargs):
self._read_started = True
try:
return self._stream.readline(*args, **kwargs)
except OSError as e:
raise UnreadablePostError(*e.args) from e
def __iter__(self):
return iter(self.readline, b"")
def readlines(self):
return list(self)
class HttpHeaders(CaseInsensitiveMapping):
HTTP_PREFIX = "HTTP_"
# PEP 333 gives two headers which aren't prepended with HTTP_.
UNPREFIXED_HEADERS = {"CONTENT_TYPE", "CONTENT_LENGTH"}
def __init__(self, environ):
headers = {}
for header, value in environ.items():
name = self.parse_header_name(header)
if name:
headers[name] = value
super().__init__(headers)
def __getitem__(self, key):
"""Allow header lookup using underscores in place of hyphens."""
return super().__getitem__(key.replace("_", "-"))
@classmethod
def parse_header_name(cls, header):
if header.startswith(cls.HTTP_PREFIX):
header = header[len(cls.HTTP_PREFIX) :]
elif header not in cls.UNPREFIXED_HEADERS:
return None
return header.replace("_", "-").title()
@classmethod
def to_wsgi_name(cls, header):
header = header.replace("-", "_").upper()
if header in cls.UNPREFIXED_HEADERS:
return header
return f"{cls.HTTP_PREFIX}{header}"
@classmethod
def to_asgi_name(cls, header):
return header.replace("-", "_").upper()
@classmethod
def to_wsgi_names(cls, headers):
return {
cls.to_wsgi_name(header_name): value
for header_name, value in headers.items()
}
@classmethod
def to_asgi_names(cls, headers):
return {
cls.to_asgi_name(header_name): value
for header_name, value in headers.items()
}
class QueryDict(MultiValueDict):
"""
A specialized MultiValueDict which represents a query string.
A QueryDict can be used to represent GET or POST data. It subclasses
MultiValueDict since keys in such data can be repeated, for instance
in the data from a form with a <select multiple> field.
By default QueryDicts are immutable, though the copy() method
will always return a mutable copy.
Both keys and values set on this class are converted from the given encoding
(DEFAULT_CHARSET by default) to str.
"""
# These are both reset in __init__, but is specified here at the class
# level so that unpickling will have valid values
_mutable = True
_encoding = None
def __init__(self, query_string=None, mutable=False, encoding=None):
super().__init__()
self.encoding = encoding or settings.DEFAULT_CHARSET
query_string = query_string or ""
parse_qsl_kwargs = {
"keep_blank_values": True,
"encoding": self.encoding,
"max_num_fields": settings.DATA_UPLOAD_MAX_NUMBER_FIELDS,
}
if isinstance(query_string, bytes):
# query_string normally contains URL-encoded data, a subset of ASCII.
try:
query_string = query_string.decode(self.encoding)
except UnicodeDecodeError:
# ... but some user agents are misbehaving :-(
query_string = query_string.decode("iso-8859-1")
try:
for key, value in parse_qsl(query_string, **parse_qsl_kwargs):
self.appendlist(key, value)
except ValueError as e:
# ValueError can also be raised if the strict_parsing argument to
# parse_qsl() is True. As that is not used by Django, assume that
# the exception was raised by exceeding the value of max_num_fields
# instead of fragile checks of exception message strings.
raise TooManyFieldsSent(
"The number of GET/POST parameters exceeded "
"settings.DATA_UPLOAD_MAX_NUMBER_FIELDS."
) from e
self._mutable = mutable
@classmethod
def fromkeys(cls, iterable, value="", mutable=False, encoding=None):
"""
Return a new QueryDict with keys (may be repeated) from an iterable and
values from value.
"""
q = cls("", mutable=True, encoding=encoding)
for key in iterable:
q.appendlist(key, value)
if not mutable:
q._mutable = False
return q
@property
def encoding(self):
if self._encoding is None:
self._encoding = settings.DEFAULT_CHARSET
return self._encoding
@encoding.setter
def encoding(self, value):
self._encoding = value
def _assert_mutable(self):
if not self._mutable:
raise AttributeError("This QueryDict instance is immutable")
def __setitem__(self, key, value):
self._assert_mutable()
key = bytes_to_text(key, self.encoding)
value = bytes_to_text(value, self.encoding)
super().__setitem__(key, value)
def __delitem__(self, key):
self._assert_mutable()
super().__delitem__(key)
def __copy__(self):
result = self.__class__("", mutable=True, encoding=self.encoding)
for key, value in self.lists():
result.setlist(key, value)
return result
def __deepcopy__(self, memo):
result = self.__class__("", mutable=True, encoding=self.encoding)
memo[id(self)] = result
for key, value in self.lists():
result.setlist(copy.deepcopy(key, memo), copy.deepcopy(value, memo))
return result
def setlist(self, key, list_):
self._assert_mutable()
key = bytes_to_text(key, self.encoding)
list_ = [bytes_to_text(elt, self.encoding) for elt in list_]
super().setlist(key, list_)
def setlistdefault(self, key, default_list=None):
self._assert_mutable()
return super().setlistdefault(key, default_list)
def appendlist(self, key, value):
self._assert_mutable()
key = bytes_to_text(key, self.encoding)
value = bytes_to_text(value, self.encoding)
super().appendlist(key, value)
def pop(self, key, *args):
self._assert_mutable()
return super().pop(key, *args)
def popitem(self):
self._assert_mutable()
return super().popitem()
def clear(self):
self._assert_mutable()
super().clear()
def setdefault(self, key, default=None):
self._assert_mutable()
key = bytes_to_text(key, self.encoding)
default = bytes_to_text(default, self.encoding)
return super().setdefault(key, default)
def copy(self):
"""Return a mutable copy of this object."""
return self.__deepcopy__({})
def urlencode(self, safe=None):
"""
Return an encoded string of all query string arguments.
`safe` specifies characters which don't require quoting, for example::
>>> q = QueryDict(mutable=True)
>>> q['next'] = '/a&b/'
>>> q.urlencode()
'next=%2Fa%26b%2F'
>>> q.urlencode(safe='/')
'next=/a%26b/'
"""
output = []
if safe:
safe = safe.encode(self.encoding)
def encode(k, v):
return "%s=%s" % ((quote(k, safe), quote(v, safe)))
else:
def encode(k, v):
return urlencode({k: v})
for k, list_ in self.lists():
output.extend(
encode(k.encode(self.encoding), str(v).encode(self.encoding))
for v in list_
)
return "&".join(output)
class MediaType:
def __init__(self, media_type_raw_line):
full_type, self.params = parse_header_parameters(
media_type_raw_line if media_type_raw_line else ""
)
self.main_type, _, self.sub_type = full_type.partition("/")
def __str__(self):
params_str = "".join("; %s=%s" % (k, v) for k, v in self.params.items())
return "%s%s%s" % (
self.main_type,
("/%s" % self.sub_type) if self.sub_type else "",
params_str,
)
def __repr__(self):
return "<%s: %s>" % (self.__class__.__qualname__, self)
@property
def is_all_types(self):
return self.main_type == "*" and self.sub_type == "*"
def match(self, other):
if self.is_all_types:
return True
other = MediaType(other)
if self.main_type == other.main_type and self.sub_type in {"*", other.sub_type}:
return True
return False
# It's neither necessary nor appropriate to use
# django.utils.encoding.force_str() for parsing URLs and form inputs. Thus,
# this slightly more restricted function, used by QueryDict.
def bytes_to_text(s, encoding):
"""
Convert bytes objects to strings, using the given encoding. Illegally
encoded input characters are replaced with Unicode "unknown" codepoint
(\ufffd).
Return any non-bytes objects without change.
"""
if isinstance(s, bytes):
return str(s, encoding, "replace")
else:
return s
def split_domain_port(host):
"""
Return a (domain, port) tuple from a given host.
Returned domain is lowercased. If the host is invalid, the domain will be
empty.
"""
host = host.lower()
if not host_validation_re.match(host):
return "", ""
if host[-1] == "]":
# It's an IPv6 address without a port.
return host, ""
bits = host.rsplit(":", 1)
domain, port = bits if len(bits) == 2 else (bits[0], "")
# Remove a trailing dot (if present) from the domain.
domain = domain[:-1] if domain.endswith(".") else domain
return domain, port
def validate_host(host, allowed_hosts):
"""
Validate the given host for this site.
Check that the host looks valid and matches a host or host pattern in the
given list of ``allowed_hosts``. Any pattern beginning with a period
matches a domain and all its subdomains (e.g. ``.example.com`` matches
``example.com`` and any subdomain), ``*`` matches anything, and anything
else must match exactly.
Note: This function assumes that the given host is lowercased and has
already had the port, if any, stripped off.
Return ``True`` for a valid host, ``False`` otherwise.
"""
return any(
pattern == "*" or is_same_domain(host, pattern) for pattern in allowed_hosts
)
def parse_accept_header(header):
return [MediaType(token) for token in header.split(",") if token.strip()]

View File

@ -0,0 +1,732 @@
import datetime
import io
import json
import mimetypes
import os
import re
import sys
import time
import warnings
from email.header import Header
from http.client import responses
from urllib.parse import urlparse
from asgiref.sync import async_to_sync, sync_to_async
from django.conf import settings
from django.core import signals, signing
from django.core.exceptions import DisallowedRedirect
from django.core.serializers.json import DjangoJSONEncoder
from django.http.cookie import SimpleCookie
from django.utils import timezone
from django.utils.datastructures import CaseInsensitiveMapping
from django.utils.encoding import iri_to_uri
from django.utils.http import content_disposition_header, http_date
from django.utils.regex_helper import _lazy_re_compile
_charset_from_content_type_re = _lazy_re_compile(
r";\s*charset=(?P<charset>[^\s;]+)", re.I
)
class ResponseHeaders(CaseInsensitiveMapping):
def __init__(self, data):
"""
Populate the initial data using __setitem__ to ensure values are
correctly encoded.
"""
self._store = {}
if data:
for header, value in self._unpack_items(data):
self[header] = value
def _convert_to_charset(self, value, charset, mime_encode=False):
"""
Convert headers key/value to ascii/latin-1 native strings.
`charset` must be 'ascii' or 'latin-1'. If `mime_encode` is True and
`value` can't be represented in the given charset, apply MIME-encoding.
"""
try:
if isinstance(value, str):
# Ensure string is valid in given charset
value.encode(charset)
elif isinstance(value, bytes):
# Convert bytestring using given charset
value = value.decode(charset)
else:
value = str(value)
# Ensure string is valid in given charset.
value.encode(charset)
if "\n" in value or "\r" in value:
raise BadHeaderError(
f"Header values can't contain newlines (got {value!r})"
)
except UnicodeError as e:
# Encoding to a string of the specified charset failed, but we
# don't know what type that value was, or if it contains newlines,
# which we may need to check for before sending it to be
# encoded for multiple character sets.
if (isinstance(value, bytes) and (b"\n" in value or b"\r" in value)) or (
isinstance(value, str) and ("\n" in value or "\r" in value)
):
raise BadHeaderError(
f"Header values can't contain newlines (got {value!r})"
) from e
if mime_encode:
value = Header(value, "utf-8", maxlinelen=sys.maxsize).encode()
else:
e.reason += ", HTTP response headers must be in %s format" % charset
raise
return value
def __delitem__(self, key):
self.pop(key)
def __setitem__(self, key, value):
key = self._convert_to_charset(key, "ascii")
value = self._convert_to_charset(value, "latin-1", mime_encode=True)
self._store[key.lower()] = (key, value)
def pop(self, key, default=None):
return self._store.pop(key.lower(), default)
def setdefault(self, key, value):
if key not in self:
self[key] = value
class BadHeaderError(ValueError):
pass
class HttpResponseBase:
"""
An HTTP response base class with dictionary-accessed headers.
This class doesn't handle content. It should not be used directly.
Use the HttpResponse and StreamingHttpResponse subclasses instead.
"""
status_code = 200
def __init__(
self, content_type=None, status=None, reason=None, charset=None, headers=None
):
self.headers = ResponseHeaders(headers)
self._charset = charset
if "Content-Type" not in self.headers:
if content_type is None:
content_type = f"text/html; charset={self.charset}"
self.headers["Content-Type"] = content_type
elif content_type:
raise ValueError(
"'headers' must not contain 'Content-Type' when the "
"'content_type' parameter is provided."
)
self._resource_closers = []
# This parameter is set by the handler. It's necessary to preserve the
# historical behavior of request_finished.
self._handler_class = None
self.cookies = SimpleCookie()
self.closed = False
if status is not None:
try:
self.status_code = int(status)
except (ValueError, TypeError):
raise TypeError("HTTP status code must be an integer.")
if not 100 <= self.status_code <= 599:
raise ValueError("HTTP status code must be an integer from 100 to 599.")
self._reason_phrase = reason
@property
def reason_phrase(self):
if self._reason_phrase is not None:
return self._reason_phrase
# Leave self._reason_phrase unset in order to use the default
# reason phrase for status code.
return responses.get(self.status_code, "Unknown Status Code")
@reason_phrase.setter
def reason_phrase(self, value):
self._reason_phrase = value
@property
def charset(self):
if self._charset is not None:
return self._charset
# The Content-Type header may not yet be set, because the charset is
# being inserted *into* it.
if content_type := self.headers.get("Content-Type"):
if matched := _charset_from_content_type_re.search(content_type):
# Extract the charset and strip its double quotes.
# Note that having parsed it from the Content-Type, we don't
# store it back into the _charset for later intentionally, to
# allow for the Content-Type to be switched again later.
return matched["charset"].replace('"', "")
return settings.DEFAULT_CHARSET
@charset.setter
def charset(self, value):
self._charset = value
def serialize_headers(self):
"""HTTP headers as a bytestring."""
return b"\r\n".join(
[
key.encode("ascii") + b": " + value.encode("latin-1")
for key, value in self.headers.items()
]
)
__bytes__ = serialize_headers
@property
def _content_type_for_repr(self):
return (
', "%s"' % self.headers["Content-Type"]
if "Content-Type" in self.headers
else ""
)
def __setitem__(self, header, value):
self.headers[header] = value
def __delitem__(self, header):
del self.headers[header]
def __getitem__(self, header):
return self.headers[header]
def has_header(self, header):
"""Case-insensitive check for a header."""
return header in self.headers
__contains__ = has_header
def items(self):
return self.headers.items()
def get(self, header, alternate=None):
return self.headers.get(header, alternate)
def set_cookie(
self,
key,
value="",
max_age=None,
expires=None,
path="/",
domain=None,
secure=False,
httponly=False,
samesite=None,
):
"""
Set a cookie.
``expires`` can be:
- a string in the correct format,
- a naive ``datetime.datetime`` object in UTC,
- an aware ``datetime.datetime`` object in any time zone.
If it is a ``datetime.datetime`` object then calculate ``max_age``.
``max_age`` can be:
- int/float specifying seconds,
- ``datetime.timedelta`` object.
"""
self.cookies[key] = value
if expires is not None:
if isinstance(expires, datetime.datetime):
if timezone.is_naive(expires):
expires = timezone.make_aware(expires, datetime.timezone.utc)
delta = expires - datetime.datetime.now(tz=datetime.timezone.utc)
# Add one second so the date matches exactly (a fraction of
# time gets lost between converting to a timedelta and
# then the date string).
delta += datetime.timedelta(seconds=1)
# Just set max_age - the max_age logic will set expires.
expires = None
if max_age is not None:
raise ValueError("'expires' and 'max_age' can't be used together.")
max_age = max(0, delta.days * 86400 + delta.seconds)
else:
self.cookies[key]["expires"] = expires
else:
self.cookies[key]["expires"] = ""
if max_age is not None:
if isinstance(max_age, datetime.timedelta):
max_age = max_age.total_seconds()
self.cookies[key]["max-age"] = int(max_age)
# IE requires expires, so set it if hasn't been already.
if not expires:
self.cookies[key]["expires"] = http_date(time.time() + max_age)
if path is not None:
self.cookies[key]["path"] = path
if domain is not None:
self.cookies[key]["domain"] = domain
if secure:
self.cookies[key]["secure"] = True
if httponly:
self.cookies[key]["httponly"] = True
if samesite:
if samesite.lower() not in ("lax", "none", "strict"):
raise ValueError('samesite must be "lax", "none", or "strict".')
self.cookies[key]["samesite"] = samesite
def setdefault(self, key, value):
"""Set a header unless it has already been set."""
self.headers.setdefault(key, value)
def set_signed_cookie(self, key, value, salt="", **kwargs):
value = signing.get_cookie_signer(salt=key + salt).sign(value)
return self.set_cookie(key, value, **kwargs)
def delete_cookie(self, key, path="/", domain=None, samesite=None):
# Browsers can ignore the Set-Cookie header if the cookie doesn't use
# the secure flag and:
# - the cookie name starts with "__Host-" or "__Secure-", or
# - the samesite is "none".
secure = key.startswith(("__Secure-", "__Host-")) or (
samesite and samesite.lower() == "none"
)
self.set_cookie(
key,
max_age=0,
path=path,
domain=domain,
secure=secure,
expires="Thu, 01 Jan 1970 00:00:00 GMT",
samesite=samesite,
)
# Common methods used by subclasses
def make_bytes(self, value):
"""Turn a value into a bytestring encoded in the output charset."""
# Per PEP 3333, this response body must be bytes. To avoid returning
# an instance of a subclass, this function returns `bytes(value)`.
# This doesn't make a copy when `value` already contains bytes.
# Handle string types -- we can't rely on force_bytes here because:
# - Python attempts str conversion first
# - when self._charset != 'utf-8' it re-encodes the content
if isinstance(value, (bytes, memoryview)):
return bytes(value)
if isinstance(value, str):
return bytes(value.encode(self.charset))
# Handle non-string types.
return str(value).encode(self.charset)
# These methods partially implement the file-like object interface.
# See https://docs.python.org/library/io.html#io.IOBase
# The WSGI server must call this method upon completion of the request.
# See http://blog.dscpl.com.au/2012/10/obligations-for-calling-close-on.html
def close(self):
for closer in self._resource_closers:
try:
closer()
except Exception:
pass
# Free resources that were still referenced.
self._resource_closers.clear()
self.closed = True
signals.request_finished.send(sender=self._handler_class)
def write(self, content):
raise OSError("This %s instance is not writable" % self.__class__.__name__)
def flush(self):
pass
def tell(self):
raise OSError(
"This %s instance cannot tell its position" % self.__class__.__name__
)
# These methods partially implement a stream-like object interface.
# See https://docs.python.org/library/io.html#io.IOBase
def readable(self):
return False
def seekable(self):
return False
def writable(self):
return False
def writelines(self, lines):
raise OSError("This %s instance is not writable" % self.__class__.__name__)
class HttpResponse(HttpResponseBase):
"""
An HTTP response class with a string as content.
This content can be read, appended to, or replaced.
"""
streaming = False
def __init__(self, content=b"", *args, **kwargs):
super().__init__(*args, **kwargs)
# Content is a bytestring. See the `content` property methods.
self.content = content
def __repr__(self):
return "<%(cls)s status_code=%(status_code)d%(content_type)s>" % {
"cls": self.__class__.__name__,
"status_code": self.status_code,
"content_type": self._content_type_for_repr,
}
def serialize(self):
"""Full HTTP message, including headers, as a bytestring."""
return self.serialize_headers() + b"\r\n\r\n" + self.content
__bytes__ = serialize
@property
def content(self):
return b"".join(self._container)
@content.setter
def content(self, value):
# Consume iterators upon assignment to allow repeated iteration.
if hasattr(value, "__iter__") and not isinstance(
value, (bytes, memoryview, str)
):
content = b"".join(self.make_bytes(chunk) for chunk in value)
if hasattr(value, "close"):
try:
value.close()
except Exception:
pass
else:
content = self.make_bytes(value)
# Create a list of properly encoded bytestrings to support write().
self._container = [content]
def __iter__(self):
return iter(self._container)
def write(self, content):
self._container.append(self.make_bytes(content))
def tell(self):
return len(self.content)
def getvalue(self):
return self.content
def writable(self):
return True
def writelines(self, lines):
for line in lines:
self.write(line)
class StreamingHttpResponse(HttpResponseBase):
"""
A streaming HTTP response class with an iterator as content.
This should only be iterated once, when the response is streamed to the
client. However, it can be appended to or replaced with a new iterator
that wraps the original content (or yields entirely new content).
"""
streaming = True
def __init__(self, streaming_content=(), *args, **kwargs):
super().__init__(*args, **kwargs)
# `streaming_content` should be an iterable of bytestrings.
# See the `streaming_content` property methods.
self.streaming_content = streaming_content
def __repr__(self):
return "<%(cls)s status_code=%(status_code)d%(content_type)s>" % {
"cls": self.__class__.__qualname__,
"status_code": self.status_code,
"content_type": self._content_type_for_repr,
}
@property
def content(self):
raise AttributeError(
"This %s instance has no `content` attribute. Use "
"`streaming_content` instead." % self.__class__.__name__
)
@property
def streaming_content(self):
if self.is_async:
# pull to lexical scope to capture fixed reference in case
# streaming_content is set again later.
_iterator = self._iterator
async def awrapper():
async for part in _iterator:
yield self.make_bytes(part)
return awrapper()
else:
return map(self.make_bytes, self._iterator)
@streaming_content.setter
def streaming_content(self, value):
self._set_streaming_content(value)
def _set_streaming_content(self, value):
# Ensure we can never iterate on "value" more than once.
try:
self._iterator = iter(value)
self.is_async = False
except TypeError:
self._iterator = value.__aiter__()
self.is_async = True
if hasattr(value, "close"):
self._resource_closers.append(value.close)
def __iter__(self):
try:
return iter(self.streaming_content)
except TypeError:
warnings.warn(
"StreamingHttpResponse must consume asynchronous iterators in order to "
"serve them synchronously. Use a synchronous iterator instead.",
Warning,
)
# async iterator. Consume in async_to_sync and map back.
async def to_list(_iterator):
as_list = []
async for chunk in _iterator:
as_list.append(chunk)
return as_list
return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator)))
async def __aiter__(self):
try:
async for part in self.streaming_content:
yield part
except TypeError:
warnings.warn(
"StreamingHttpResponse must consume synchronous iterators in order to "
"serve them asynchronously. Use an asynchronous iterator instead.",
Warning,
)
# sync iterator. Consume via sync_to_async and yield via async
# generator.
for part in await sync_to_async(list)(self.streaming_content):
yield part
def getvalue(self):
return b"".join(self.streaming_content)
class FileResponse(StreamingHttpResponse):
"""
A streaming HTTP response class optimized for files.
"""
block_size = 4096
def __init__(self, *args, as_attachment=False, filename="", **kwargs):
self.as_attachment = as_attachment
self.filename = filename
self._no_explicit_content_type = (
"content_type" not in kwargs or kwargs["content_type"] is None
)
super().__init__(*args, **kwargs)
def _set_streaming_content(self, value):
if not hasattr(value, "read"):
self.file_to_stream = None
return super()._set_streaming_content(value)
self.file_to_stream = filelike = value
if hasattr(filelike, "close"):
self._resource_closers.append(filelike.close)
value = iter(lambda: filelike.read(self.block_size), b"")
self.set_headers(filelike)
super()._set_streaming_content(value)
def set_headers(self, filelike):
"""
Set some common response headers (Content-Length, Content-Type, and
Content-Disposition) based on the `filelike` response content.
"""
filename = getattr(filelike, "name", "")
filename = filename if isinstance(filename, str) else ""
seekable = hasattr(filelike, "seek") and (
not hasattr(filelike, "seekable") or filelike.seekable()
)
if hasattr(filelike, "tell"):
if seekable:
initial_position = filelike.tell()
filelike.seek(0, io.SEEK_END)
self.headers["Content-Length"] = filelike.tell() - initial_position
filelike.seek(initial_position)
elif hasattr(filelike, "getbuffer"):
self.headers["Content-Length"] = (
filelike.getbuffer().nbytes - filelike.tell()
)
elif os.path.exists(filename):
self.headers["Content-Length"] = (
os.path.getsize(filename) - filelike.tell()
)
elif seekable:
self.headers["Content-Length"] = sum(
iter(lambda: len(filelike.read(self.block_size)), 0)
)
filelike.seek(-int(self.headers["Content-Length"]), io.SEEK_END)
filename = os.path.basename(self.filename or filename)
if self._no_explicit_content_type:
if filename:
content_type, encoding = mimetypes.guess_type(filename)
# Encoding isn't set to prevent browsers from automatically
# uncompressing files.
content_type = {
"bzip2": "application/x-bzip",
"gzip": "application/gzip",
"xz": "application/x-xz",
}.get(encoding, content_type)
self.headers["Content-Type"] = (
content_type or "application/octet-stream"
)
else:
self.headers["Content-Type"] = "application/octet-stream"
if content_disposition := content_disposition_header(
self.as_attachment, filename
):
self.headers["Content-Disposition"] = content_disposition
class HttpResponseRedirectBase(HttpResponse):
allowed_schemes = ["http", "https", "ftp"]
def __init__(self, redirect_to, *args, **kwargs):
super().__init__(*args, **kwargs)
self["Location"] = iri_to_uri(redirect_to)
parsed = urlparse(str(redirect_to))
if parsed.scheme and parsed.scheme not in self.allowed_schemes:
raise DisallowedRedirect(
"Unsafe redirect to URL with protocol '%s'" % parsed.scheme
)
url = property(lambda self: self["Location"])
def __repr__(self):
return (
'<%(cls)s status_code=%(status_code)d%(content_type)s, url="%(url)s">'
% {
"cls": self.__class__.__name__,
"status_code": self.status_code,
"content_type": self._content_type_for_repr,
"url": self.url,
}
)
class HttpResponseRedirect(HttpResponseRedirectBase):
status_code = 302
class HttpResponsePermanentRedirect(HttpResponseRedirectBase):
status_code = 301
class HttpResponseNotModified(HttpResponse):
status_code = 304
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
del self["content-type"]
@HttpResponse.content.setter
def content(self, value):
if value:
raise AttributeError(
"You cannot set content to a 304 (Not Modified) response"
)
self._container = []
class HttpResponseBadRequest(HttpResponse):
status_code = 400
class HttpResponseNotFound(HttpResponse):
status_code = 404
class HttpResponseForbidden(HttpResponse):
status_code = 403
class HttpResponseNotAllowed(HttpResponse):
status_code = 405
def __init__(self, permitted_methods, *args, **kwargs):
super().__init__(*args, **kwargs)
self["Allow"] = ", ".join(permitted_methods)
def __repr__(self):
return "<%(cls)s [%(methods)s] status_code=%(status_code)d%(content_type)s>" % {
"cls": self.__class__.__name__,
"status_code": self.status_code,
"content_type": self._content_type_for_repr,
"methods": self["Allow"],
}
class HttpResponseGone(HttpResponse):
status_code = 410
class HttpResponseServerError(HttpResponse):
status_code = 500
class Http404(Exception):
pass
class JsonResponse(HttpResponse):
"""
An HTTP response class that consumes data to be serialized to JSON.
:param data: Data to be dumped into json. By default only ``dict`` objects
are allowed to be passed due to a security flaw before ECMAScript 5. See
the ``safe`` parameter for more information.
:param encoder: Should be a json encoder class. Defaults to
``django.core.serializers.json.DjangoJSONEncoder``.
:param safe: Controls if only ``dict`` objects may be serialized. Defaults
to ``True``.
:param json_dumps_params: A dictionary of kwargs passed to json.dumps().
"""
def __init__(
self,
data,
encoder=DjangoJSONEncoder,
safe=True,
json_dumps_params=None,
**kwargs,
):
if safe and not isinstance(data, dict):
raise TypeError(
"In order to allow non-dict objects to be serialized set the "
"safe parameter to False."
)
if json_dumps_params is None:
json_dumps_params = {}
kwargs.setdefault("content_type", "application/json")
data = json.dumps(data, cls=encoder, **json_dumps_params)
super().__init__(content=data, **kwargs)