945 lines
36 KiB
Python
945 lines
36 KiB
Python
|
import posixpath
|
||
|
import typing as t
|
||
|
import warnings
|
||
|
from pprint import pformat
|
||
|
from threading import Lock
|
||
|
|
||
|
from .._internal import _encode_idna
|
||
|
from .._internal import _get_environ
|
||
|
from .._internal import _to_str
|
||
|
from .._internal import _wsgi_decoding_dance
|
||
|
from ..datastructures import ImmutableDict
|
||
|
from ..datastructures import MultiDict
|
||
|
from ..exceptions import BadHost
|
||
|
from ..exceptions import HTTPException
|
||
|
from ..exceptions import MethodNotAllowed
|
||
|
from ..exceptions import NotFound
|
||
|
from ..urls import url_encode
|
||
|
from ..urls import url_join
|
||
|
from ..urls import url_quote
|
||
|
from ..wsgi import get_host
|
||
|
from .converters import DEFAULT_CONVERTERS
|
||
|
from .exceptions import BuildError
|
||
|
from .exceptions import NoMatch
|
||
|
from .exceptions import RequestAliasRedirect
|
||
|
from .exceptions import RequestPath
|
||
|
from .exceptions import RequestRedirect
|
||
|
from .exceptions import WebsocketMismatch
|
||
|
from .matcher import StateMachineMatcher
|
||
|
from .rules import _simple_rule_re
|
||
|
from .rules import Rule
|
||
|
|
||
|
if t.TYPE_CHECKING:
|
||
|
import typing_extensions as te
|
||
|
from _typeshed.wsgi import WSGIApplication
|
||
|
from _typeshed.wsgi import WSGIEnvironment
|
||
|
from .converters import BaseConverter
|
||
|
from .rules import RuleFactory
|
||
|
from ..wrappers.request import Request
|
||
|
|
||
|
|
||
|
class Map:
|
||
|
"""The map class stores all the URL rules and some configuration
|
||
|
parameters. Some of the configuration values are only stored on the
|
||
|
`Map` instance since those affect all rules, others are just defaults
|
||
|
and can be overridden for each rule. Note that you have to specify all
|
||
|
arguments besides the `rules` as keyword arguments!
|
||
|
|
||
|
:param rules: sequence of url rules for this map.
|
||
|
:param default_subdomain: The default subdomain for rules without a
|
||
|
subdomain defined.
|
||
|
:param charset: charset of the url. defaults to ``"utf-8"``
|
||
|
:param strict_slashes: If a rule ends with a slash but the matched
|
||
|
URL does not, redirect to the URL with a trailing slash.
|
||
|
:param merge_slashes: Merge consecutive slashes when matching or
|
||
|
building URLs. Matches will redirect to the normalized URL.
|
||
|
Slashes in variable parts are not merged.
|
||
|
:param redirect_defaults: This will redirect to the default rule if it
|
||
|
wasn't visited that way. This helps creating
|
||
|
unique URLs.
|
||
|
:param converters: A dict of converters that adds additional converters
|
||
|
to the list of converters. If you redefine one
|
||
|
converter this will override the original one.
|
||
|
:param sort_parameters: If set to `True` the url parameters are sorted.
|
||
|
See `url_encode` for more details.
|
||
|
:param sort_key: The sort key function for `url_encode`.
|
||
|
:param encoding_errors: the error method to use for decoding
|
||
|
:param host_matching: if set to `True` it enables the host matching
|
||
|
feature and disables the subdomain one. If
|
||
|
enabled the `host` parameter to rules is used
|
||
|
instead of the `subdomain` one.
|
||
|
|
||
|
.. versionchanged:: 1.0
|
||
|
If ``url_scheme`` is ``ws`` or ``wss``, only WebSocket rules
|
||
|
will match.
|
||
|
|
||
|
.. versionchanged:: 1.0
|
||
|
Added ``merge_slashes``.
|
||
|
|
||
|
.. versionchanged:: 0.7
|
||
|
Added ``encoding_errors`` and ``host_matching``.
|
||
|
|
||
|
.. versionchanged:: 0.5
|
||
|
Added ``sort_parameters`` and ``sort_key``.
|
||
|
"""
|
||
|
|
||
|
#: A dict of default converters to be used.
|
||
|
default_converters = ImmutableDict(DEFAULT_CONVERTERS)
|
||
|
|
||
|
#: The type of lock to use when updating.
|
||
|
#:
|
||
|
#: .. versionadded:: 1.0
|
||
|
lock_class = Lock
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
rules: t.Optional[t.Iterable["RuleFactory"]] = None,
|
||
|
default_subdomain: str = "",
|
||
|
charset: str = "utf-8",
|
||
|
strict_slashes: bool = True,
|
||
|
merge_slashes: bool = True,
|
||
|
redirect_defaults: bool = True,
|
||
|
converters: t.Optional[t.Mapping[str, t.Type["BaseConverter"]]] = None,
|
||
|
sort_parameters: bool = False,
|
||
|
sort_key: t.Optional[t.Callable[[t.Any], t.Any]] = None,
|
||
|
encoding_errors: str = "replace",
|
||
|
host_matching: bool = False,
|
||
|
) -> None:
|
||
|
self._matcher = StateMachineMatcher(merge_slashes)
|
||
|
self._rules_by_endpoint: t.Dict[str, t.List[Rule]] = {}
|
||
|
self._remap = True
|
||
|
self._remap_lock = self.lock_class()
|
||
|
|
||
|
self.default_subdomain = default_subdomain
|
||
|
self.charset = charset
|
||
|
self.encoding_errors = encoding_errors
|
||
|
self.strict_slashes = strict_slashes
|
||
|
self.merge_slashes = merge_slashes
|
||
|
self.redirect_defaults = redirect_defaults
|
||
|
self.host_matching = host_matching
|
||
|
|
||
|
self.converters = self.default_converters.copy()
|
||
|
if converters:
|
||
|
self.converters.update(converters)
|
||
|
|
||
|
self.sort_parameters = sort_parameters
|
||
|
self.sort_key = sort_key
|
||
|
|
||
|
for rulefactory in rules or ():
|
||
|
self.add(rulefactory)
|
||
|
|
||
|
def is_endpoint_expecting(self, endpoint: str, *arguments: str) -> bool:
|
||
|
"""Iterate over all rules and check if the endpoint expects
|
||
|
the arguments provided. This is for example useful if you have
|
||
|
some URLs that expect a language code and others that do not and
|
||
|
you want to wrap the builder a bit so that the current language
|
||
|
code is automatically added if not provided but endpoints expect
|
||
|
it.
|
||
|
|
||
|
:param endpoint: the endpoint to check.
|
||
|
:param arguments: this function accepts one or more arguments
|
||
|
as positional arguments. Each one of them is
|
||
|
checked.
|
||
|
"""
|
||
|
self.update()
|
||
|
arguments = set(arguments)
|
||
|
for rule in self._rules_by_endpoint[endpoint]:
|
||
|
if arguments.issubset(rule.arguments):
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
@property
|
||
|
def _rules(self) -> t.List[Rule]:
|
||
|
return [rule for rules in self._rules_by_endpoint.values() for rule in rules]
|
||
|
|
||
|
def iter_rules(self, endpoint: t.Optional[str] = None) -> t.Iterator[Rule]:
|
||
|
"""Iterate over all rules or the rules of an endpoint.
|
||
|
|
||
|
:param endpoint: if provided only the rules for that endpoint
|
||
|
are returned.
|
||
|
:return: an iterator
|
||
|
"""
|
||
|
self.update()
|
||
|
if endpoint is not None:
|
||
|
return iter(self._rules_by_endpoint[endpoint])
|
||
|
return iter(self._rules)
|
||
|
|
||
|
def add(self, rulefactory: "RuleFactory") -> None:
|
||
|
"""Add a new rule or factory to the map and bind it. Requires that the
|
||
|
rule is not bound to another map.
|
||
|
|
||
|
:param rulefactory: a :class:`Rule` or :class:`RuleFactory`
|
||
|
"""
|
||
|
for rule in rulefactory.get_rules(self):
|
||
|
rule.bind(self)
|
||
|
if not rule.build_only:
|
||
|
self._matcher.add(rule)
|
||
|
self._rules_by_endpoint.setdefault(rule.endpoint, []).append(rule)
|
||
|
self._remap = True
|
||
|
|
||
|
def bind(
|
||
|
self,
|
||
|
server_name: str,
|
||
|
script_name: t.Optional[str] = None,
|
||
|
subdomain: t.Optional[str] = None,
|
||
|
url_scheme: str = "http",
|
||
|
default_method: str = "GET",
|
||
|
path_info: t.Optional[str] = None,
|
||
|
query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None,
|
||
|
) -> "MapAdapter":
|
||
|
"""Return a new :class:`MapAdapter` with the details specified to the
|
||
|
call. Note that `script_name` will default to ``'/'`` if not further
|
||
|
specified or `None`. The `server_name` at least is a requirement
|
||
|
because the HTTP RFC requires absolute URLs for redirects and so all
|
||
|
redirect exceptions raised by Werkzeug will contain the full canonical
|
||
|
URL.
|
||
|
|
||
|
If no path_info is passed to :meth:`match` it will use the default path
|
||
|
info passed to bind. While this doesn't really make sense for
|
||
|
manual bind calls, it's useful if you bind a map to a WSGI
|
||
|
environment which already contains the path info.
|
||
|
|
||
|
`subdomain` will default to the `default_subdomain` for this map if
|
||
|
no defined. If there is no `default_subdomain` you cannot use the
|
||
|
subdomain feature.
|
||
|
|
||
|
.. versionchanged:: 1.0
|
||
|
If ``url_scheme`` is ``ws`` or ``wss``, only WebSocket rules
|
||
|
will match.
|
||
|
|
||
|
.. versionchanged:: 0.15
|
||
|
``path_info`` defaults to ``'/'`` if ``None``.
|
||
|
|
||
|
.. versionchanged:: 0.8
|
||
|
``query_args`` can be a string.
|
||
|
|
||
|
.. versionchanged:: 0.7
|
||
|
Added ``query_args``.
|
||
|
"""
|
||
|
server_name = server_name.lower()
|
||
|
if self.host_matching:
|
||
|
if subdomain is not None:
|
||
|
raise RuntimeError("host matching enabled and a subdomain was provided")
|
||
|
elif subdomain is None:
|
||
|
subdomain = self.default_subdomain
|
||
|
if script_name is None:
|
||
|
script_name = "/"
|
||
|
if path_info is None:
|
||
|
path_info = "/"
|
||
|
|
||
|
try:
|
||
|
server_name = _encode_idna(server_name) # type: ignore
|
||
|
except UnicodeError as e:
|
||
|
raise BadHost() from e
|
||
|
|
||
|
return MapAdapter(
|
||
|
self,
|
||
|
server_name,
|
||
|
script_name,
|
||
|
subdomain,
|
||
|
url_scheme,
|
||
|
path_info,
|
||
|
default_method,
|
||
|
query_args,
|
||
|
)
|
||
|
|
||
|
def bind_to_environ(
|
||
|
self,
|
||
|
environ: t.Union["WSGIEnvironment", "Request"],
|
||
|
server_name: t.Optional[str] = None,
|
||
|
subdomain: t.Optional[str] = None,
|
||
|
) -> "MapAdapter":
|
||
|
"""Like :meth:`bind` but you can pass it an WSGI environment and it
|
||
|
will fetch the information from that dictionary. Note that because of
|
||
|
limitations in the protocol there is no way to get the current
|
||
|
subdomain and real `server_name` from the environment. If you don't
|
||
|
provide it, Werkzeug will use `SERVER_NAME` and `SERVER_PORT` (or
|
||
|
`HTTP_HOST` if provided) as used `server_name` with disabled subdomain
|
||
|
feature.
|
||
|
|
||
|
If `subdomain` is `None` but an environment and a server name is
|
||
|
provided it will calculate the current subdomain automatically.
|
||
|
Example: `server_name` is ``'example.com'`` and the `SERVER_NAME`
|
||
|
in the wsgi `environ` is ``'staging.dev.example.com'`` the calculated
|
||
|
subdomain will be ``'staging.dev'``.
|
||
|
|
||
|
If the object passed as environ has an environ attribute, the value of
|
||
|
this attribute is used instead. This allows you to pass request
|
||
|
objects. Additionally `PATH_INFO` added as a default of the
|
||
|
:class:`MapAdapter` so that you don't have to pass the path info to
|
||
|
the match method.
|
||
|
|
||
|
.. versionchanged:: 1.0.0
|
||
|
If the passed server name specifies port 443, it will match
|
||
|
if the incoming scheme is ``https`` without a port.
|
||
|
|
||
|
.. versionchanged:: 1.0.0
|
||
|
A warning is shown when the passed server name does not
|
||
|
match the incoming WSGI server name.
|
||
|
|
||
|
.. versionchanged:: 0.8
|
||
|
This will no longer raise a ValueError when an unexpected server
|
||
|
name was passed.
|
||
|
|
||
|
.. versionchanged:: 0.5
|
||
|
previously this method accepted a bogus `calculate_subdomain`
|
||
|
parameter that did not have any effect. It was removed because
|
||
|
of that.
|
||
|
|
||
|
:param environ: a WSGI environment.
|
||
|
:param server_name: an optional server name hint (see above).
|
||
|
:param subdomain: optionally the current subdomain (see above).
|
||
|
"""
|
||
|
env = _get_environ(environ)
|
||
|
wsgi_server_name = get_host(env).lower()
|
||
|
scheme = env["wsgi.url_scheme"]
|
||
|
upgrade = any(
|
||
|
v.strip() == "upgrade"
|
||
|
for v in env.get("HTTP_CONNECTION", "").lower().split(",")
|
||
|
)
|
||
|
|
||
|
if upgrade and env.get("HTTP_UPGRADE", "").lower() == "websocket":
|
||
|
scheme = "wss" if scheme == "https" else "ws"
|
||
|
|
||
|
if server_name is None:
|
||
|
server_name = wsgi_server_name
|
||
|
else:
|
||
|
server_name = server_name.lower()
|
||
|
|
||
|
# strip standard port to match get_host()
|
||
|
if scheme in {"http", "ws"} and server_name.endswith(":80"):
|
||
|
server_name = server_name[:-3]
|
||
|
elif scheme in {"https", "wss"} and server_name.endswith(":443"):
|
||
|
server_name = server_name[:-4]
|
||
|
|
||
|
if subdomain is None and not self.host_matching:
|
||
|
cur_server_name = wsgi_server_name.split(".")
|
||
|
real_server_name = server_name.split(".")
|
||
|
offset = -len(real_server_name)
|
||
|
|
||
|
if cur_server_name[offset:] != real_server_name:
|
||
|
# This can happen even with valid configs if the server was
|
||
|
# accessed directly by IP address under some situations.
|
||
|
# Instead of raising an exception like in Werkzeug 0.7 or
|
||
|
# earlier we go by an invalid subdomain which will result
|
||
|
# in a 404 error on matching.
|
||
|
warnings.warn(
|
||
|
f"Current server name {wsgi_server_name!r} doesn't match configured"
|
||
|
f" server name {server_name!r}",
|
||
|
stacklevel=2,
|
||
|
)
|
||
|
subdomain = "<invalid>"
|
||
|
else:
|
||
|
subdomain = ".".join(filter(None, cur_server_name[:offset]))
|
||
|
|
||
|
def _get_wsgi_string(name: str) -> t.Optional[str]:
|
||
|
val = env.get(name)
|
||
|
if val is not None:
|
||
|
return _wsgi_decoding_dance(val, self.charset)
|
||
|
return None
|
||
|
|
||
|
script_name = _get_wsgi_string("SCRIPT_NAME")
|
||
|
path_info = _get_wsgi_string("PATH_INFO")
|
||
|
query_args = _get_wsgi_string("QUERY_STRING")
|
||
|
return Map.bind(
|
||
|
self,
|
||
|
server_name,
|
||
|
script_name,
|
||
|
subdomain,
|
||
|
scheme,
|
||
|
env["REQUEST_METHOD"],
|
||
|
path_info,
|
||
|
query_args=query_args,
|
||
|
)
|
||
|
|
||
|
def update(self) -> None:
|
||
|
"""Called before matching and building to keep the compiled rules
|
||
|
in the correct order after things changed.
|
||
|
"""
|
||
|
if not self._remap:
|
||
|
return
|
||
|
|
||
|
with self._remap_lock:
|
||
|
if not self._remap:
|
||
|
return
|
||
|
|
||
|
self._matcher.update()
|
||
|
for rules in self._rules_by_endpoint.values():
|
||
|
rules.sort(key=lambda x: x.build_compare_key())
|
||
|
self._remap = False
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
rules = self.iter_rules()
|
||
|
return f"{type(self).__name__}({pformat(list(rules))})"
|
||
|
|
||
|
|
||
|
class MapAdapter:
|
||
|
|
||
|
"""Returned by :meth:`Map.bind` or :meth:`Map.bind_to_environ` and does
|
||
|
the URL matching and building based on runtime information.
|
||
|
"""
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
map: Map,
|
||
|
server_name: str,
|
||
|
script_name: str,
|
||
|
subdomain: t.Optional[str],
|
||
|
url_scheme: str,
|
||
|
path_info: str,
|
||
|
default_method: str,
|
||
|
query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None,
|
||
|
):
|
||
|
self.map = map
|
||
|
self.server_name = _to_str(server_name)
|
||
|
script_name = _to_str(script_name)
|
||
|
if not script_name.endswith("/"):
|
||
|
script_name += "/"
|
||
|
self.script_name = script_name
|
||
|
self.subdomain = _to_str(subdomain)
|
||
|
self.url_scheme = _to_str(url_scheme)
|
||
|
self.path_info = _to_str(path_info)
|
||
|
self.default_method = _to_str(default_method)
|
||
|
self.query_args = query_args
|
||
|
self.websocket = self.url_scheme in {"ws", "wss"}
|
||
|
|
||
|
def dispatch(
|
||
|
self,
|
||
|
view_func: t.Callable[[str, t.Mapping[str, t.Any]], "WSGIApplication"],
|
||
|
path_info: t.Optional[str] = None,
|
||
|
method: t.Optional[str] = None,
|
||
|
catch_http_exceptions: bool = False,
|
||
|
) -> "WSGIApplication":
|
||
|
"""Does the complete dispatching process. `view_func` is called with
|
||
|
the endpoint and a dict with the values for the view. It should
|
||
|
look up the view function, call it, and return a response object
|
||
|
or WSGI application. http exceptions are not caught by default
|
||
|
so that applications can display nicer error messages by just
|
||
|
catching them by hand. If you want to stick with the default
|
||
|
error messages you can pass it ``catch_http_exceptions=True`` and
|
||
|
it will catch the http exceptions.
|
||
|
|
||
|
Here a small example for the dispatch usage::
|
||
|
|
||
|
from werkzeug.wrappers import Request, Response
|
||
|
from werkzeug.wsgi import responder
|
||
|
from werkzeug.routing import Map, Rule
|
||
|
|
||
|
def on_index(request):
|
||
|
return Response('Hello from the index')
|
||
|
|
||
|
url_map = Map([Rule('/', endpoint='index')])
|
||
|
views = {'index': on_index}
|
||
|
|
||
|
@responder
|
||
|
def application(environ, start_response):
|
||
|
request = Request(environ)
|
||
|
urls = url_map.bind_to_environ(environ)
|
||
|
return urls.dispatch(lambda e, v: views[e](request, **v),
|
||
|
catch_http_exceptions=True)
|
||
|
|
||
|
Keep in mind that this method might return exception objects, too, so
|
||
|
use :class:`Response.force_type` to get a response object.
|
||
|
|
||
|
:param view_func: a function that is called with the endpoint as
|
||
|
first argument and the value dict as second. Has
|
||
|
to dispatch to the actual view function with this
|
||
|
information. (see above)
|
||
|
:param path_info: the path info to use for matching. Overrides the
|
||
|
path info specified on binding.
|
||
|
:param method: the HTTP method used for matching. Overrides the
|
||
|
method specified on binding.
|
||
|
:param catch_http_exceptions: set to `True` to catch any of the
|
||
|
werkzeug :class:`HTTPException`\\s.
|
||
|
"""
|
||
|
try:
|
||
|
try:
|
||
|
endpoint, args = self.match(path_info, method)
|
||
|
except RequestRedirect as e:
|
||
|
return e
|
||
|
return view_func(endpoint, args)
|
||
|
except HTTPException as e:
|
||
|
if catch_http_exceptions:
|
||
|
return e
|
||
|
raise
|
||
|
|
||
|
@t.overload
|
||
|
def match( # type: ignore
|
||
|
self,
|
||
|
path_info: t.Optional[str] = None,
|
||
|
method: t.Optional[str] = None,
|
||
|
return_rule: "te.Literal[False]" = False,
|
||
|
query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None,
|
||
|
websocket: t.Optional[bool] = None,
|
||
|
) -> t.Tuple[str, t.Mapping[str, t.Any]]:
|
||
|
...
|
||
|
|
||
|
@t.overload
|
||
|
def match(
|
||
|
self,
|
||
|
path_info: t.Optional[str] = None,
|
||
|
method: t.Optional[str] = None,
|
||
|
return_rule: "te.Literal[True]" = True,
|
||
|
query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None,
|
||
|
websocket: t.Optional[bool] = None,
|
||
|
) -> t.Tuple[Rule, t.Mapping[str, t.Any]]:
|
||
|
...
|
||
|
|
||
|
def match(
|
||
|
self,
|
||
|
path_info: t.Optional[str] = None,
|
||
|
method: t.Optional[str] = None,
|
||
|
return_rule: bool = False,
|
||
|
query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None,
|
||
|
websocket: t.Optional[bool] = None,
|
||
|
) -> t.Tuple[t.Union[str, Rule], t.Mapping[str, t.Any]]:
|
||
|
"""The usage is simple: you just pass the match method the current
|
||
|
path info as well as the method (which defaults to `GET`). The
|
||
|
following things can then happen:
|
||
|
|
||
|
- you receive a `NotFound` exception that indicates that no URL is
|
||
|
matching. A `NotFound` exception is also a WSGI application you
|
||
|
can call to get a default page not found page (happens to be the
|
||
|
same object as `werkzeug.exceptions.NotFound`)
|
||
|
|
||
|
- you receive a `MethodNotAllowed` exception that indicates that there
|
||
|
is a match for this URL but not for the current request method.
|
||
|
This is useful for RESTful applications.
|
||
|
|
||
|
- you receive a `RequestRedirect` exception with a `new_url`
|
||
|
attribute. This exception is used to notify you about a request
|
||
|
Werkzeug requests from your WSGI application. This is for example the
|
||
|
case if you request ``/foo`` although the correct URL is ``/foo/``
|
||
|
You can use the `RequestRedirect` instance as response-like object
|
||
|
similar to all other subclasses of `HTTPException`.
|
||
|
|
||
|
- you receive a ``WebsocketMismatch`` exception if the only
|
||
|
match is a WebSocket rule but the bind is an HTTP request, or
|
||
|
if the match is an HTTP rule but the bind is a WebSocket
|
||
|
request.
|
||
|
|
||
|
- you get a tuple in the form ``(endpoint, arguments)`` if there is
|
||
|
a match (unless `return_rule` is True, in which case you get a tuple
|
||
|
in the form ``(rule, arguments)``)
|
||
|
|
||
|
If the path info is not passed to the match method the default path
|
||
|
info of the map is used (defaults to the root URL if not defined
|
||
|
explicitly).
|
||
|
|
||
|
All of the exceptions raised are subclasses of `HTTPException` so they
|
||
|
can be used as WSGI responses. They will all render generic error or
|
||
|
redirect pages.
|
||
|
|
||
|
Here is a small example for matching:
|
||
|
|
||
|
>>> m = Map([
|
||
|
... Rule('/', endpoint='index'),
|
||
|
... Rule('/downloads/', endpoint='downloads/index'),
|
||
|
... Rule('/downloads/<int:id>', endpoint='downloads/show')
|
||
|
... ])
|
||
|
>>> urls = m.bind("example.com", "/")
|
||
|
>>> urls.match("/", "GET")
|
||
|
('index', {})
|
||
|
>>> urls.match("/downloads/42")
|
||
|
('downloads/show', {'id': 42})
|
||
|
|
||
|
And here is what happens on redirect and missing URLs:
|
||
|
|
||
|
>>> urls.match("/downloads")
|
||
|
Traceback (most recent call last):
|
||
|
...
|
||
|
RequestRedirect: http://example.com/downloads/
|
||
|
>>> urls.match("/missing")
|
||
|
Traceback (most recent call last):
|
||
|
...
|
||
|
NotFound: 404 Not Found
|
||
|
|
||
|
:param path_info: the path info to use for matching. Overrides the
|
||
|
path info specified on binding.
|
||
|
:param method: the HTTP method used for matching. Overrides the
|
||
|
method specified on binding.
|
||
|
:param return_rule: return the rule that matched instead of just the
|
||
|
endpoint (defaults to `False`).
|
||
|
:param query_args: optional query arguments that are used for
|
||
|
automatic redirects as string or dictionary. It's
|
||
|
currently not possible to use the query arguments
|
||
|
for URL matching.
|
||
|
:param websocket: Match WebSocket instead of HTTP requests. A
|
||
|
websocket request has a ``ws`` or ``wss``
|
||
|
:attr:`url_scheme`. This overrides that detection.
|
||
|
|
||
|
.. versionadded:: 1.0
|
||
|
Added ``websocket``.
|
||
|
|
||
|
.. versionchanged:: 0.8
|
||
|
``query_args`` can be a string.
|
||
|
|
||
|
.. versionadded:: 0.7
|
||
|
Added ``query_args``.
|
||
|
|
||
|
.. versionadded:: 0.6
|
||
|
Added ``return_rule``.
|
||
|
"""
|
||
|
self.map.update()
|
||
|
if path_info is None:
|
||
|
path_info = self.path_info
|
||
|
else:
|
||
|
path_info = _to_str(path_info, self.map.charset)
|
||
|
if query_args is None:
|
||
|
query_args = self.query_args or {}
|
||
|
method = (method or self.default_method).upper()
|
||
|
|
||
|
if websocket is None:
|
||
|
websocket = self.websocket
|
||
|
|
||
|
domain_part = self.server_name if self.map.host_matching else self.subdomain
|
||
|
path_part = f"/{path_info.lstrip('/')}" if path_info else ""
|
||
|
|
||
|
try:
|
||
|
result = self.map._matcher.match(domain_part, path_part, method, websocket)
|
||
|
except RequestPath as e:
|
||
|
raise RequestRedirect(
|
||
|
self.make_redirect_url(
|
||
|
url_quote(e.path_info, self.map.charset, safe="/:|+"),
|
||
|
query_args,
|
||
|
)
|
||
|
) from None
|
||
|
except RequestAliasRedirect as e:
|
||
|
raise RequestRedirect(
|
||
|
self.make_alias_redirect_url(
|
||
|
f"{domain_part}|{path_part}",
|
||
|
e.endpoint,
|
||
|
e.matched_values,
|
||
|
method,
|
||
|
query_args,
|
||
|
)
|
||
|
) from None
|
||
|
except NoMatch as e:
|
||
|
if e.have_match_for:
|
||
|
raise MethodNotAllowed(valid_methods=list(e.have_match_for)) from None
|
||
|
|
||
|
if e.websocket_mismatch:
|
||
|
raise WebsocketMismatch() from None
|
||
|
|
||
|
raise NotFound() from None
|
||
|
else:
|
||
|
rule, rv = result
|
||
|
|
||
|
if self.map.redirect_defaults:
|
||
|
redirect_url = self.get_default_redirect(rule, method, rv, query_args)
|
||
|
if redirect_url is not None:
|
||
|
raise RequestRedirect(redirect_url)
|
||
|
|
||
|
if rule.redirect_to is not None:
|
||
|
if isinstance(rule.redirect_to, str):
|
||
|
|
||
|
def _handle_match(match: t.Match[str]) -> str:
|
||
|
value = rv[match.group(1)]
|
||
|
return rule._converters[match.group(1)].to_url(value)
|
||
|
|
||
|
redirect_url = _simple_rule_re.sub(_handle_match, rule.redirect_to)
|
||
|
else:
|
||
|
redirect_url = rule.redirect_to(self, **rv)
|
||
|
|
||
|
if self.subdomain:
|
||
|
netloc = f"{self.subdomain}.{self.server_name}"
|
||
|
else:
|
||
|
netloc = self.server_name
|
||
|
|
||
|
raise RequestRedirect(
|
||
|
url_join(
|
||
|
f"{self.url_scheme or 'http'}://{netloc}{self.script_name}",
|
||
|
redirect_url,
|
||
|
)
|
||
|
)
|
||
|
|
||
|
if return_rule:
|
||
|
return rule, rv
|
||
|
else:
|
||
|
return rule.endpoint, rv
|
||
|
|
||
|
def test(
|
||
|
self, path_info: t.Optional[str] = None, method: t.Optional[str] = None
|
||
|
) -> bool:
|
||
|
"""Test if a rule would match. Works like `match` but returns `True`
|
||
|
if the URL matches, or `False` if it does not exist.
|
||
|
|
||
|
:param path_info: the path info to use for matching. Overrides the
|
||
|
path info specified on binding.
|
||
|
:param method: the HTTP method used for matching. Overrides the
|
||
|
method specified on binding.
|
||
|
"""
|
||
|
try:
|
||
|
self.match(path_info, method)
|
||
|
except RequestRedirect:
|
||
|
pass
|
||
|
except HTTPException:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def allowed_methods(self, path_info: t.Optional[str] = None) -> t.Iterable[str]:
|
||
|
"""Returns the valid methods that match for a given path.
|
||
|
|
||
|
.. versionadded:: 0.7
|
||
|
"""
|
||
|
try:
|
||
|
self.match(path_info, method="--")
|
||
|
except MethodNotAllowed as e:
|
||
|
return e.valid_methods # type: ignore
|
||
|
except HTTPException:
|
||
|
pass
|
||
|
return []
|
||
|
|
||
|
def get_host(self, domain_part: t.Optional[str]) -> str:
|
||
|
"""Figures out the full host name for the given domain part. The
|
||
|
domain part is a subdomain in case host matching is disabled or
|
||
|
a full host name.
|
||
|
"""
|
||
|
if self.map.host_matching:
|
||
|
if domain_part is None:
|
||
|
return self.server_name
|
||
|
return _to_str(domain_part, "ascii")
|
||
|
subdomain = domain_part
|
||
|
if subdomain is None:
|
||
|
subdomain = self.subdomain
|
||
|
else:
|
||
|
subdomain = _to_str(subdomain, "ascii")
|
||
|
|
||
|
if subdomain:
|
||
|
return f"{subdomain}.{self.server_name}"
|
||
|
else:
|
||
|
return self.server_name
|
||
|
|
||
|
def get_default_redirect(
|
||
|
self,
|
||
|
rule: Rule,
|
||
|
method: str,
|
||
|
values: t.MutableMapping[str, t.Any],
|
||
|
query_args: t.Union[t.Mapping[str, t.Any], str],
|
||
|
) -> t.Optional[str]:
|
||
|
"""A helper that returns the URL to redirect to if it finds one.
|
||
|
This is used for default redirecting only.
|
||
|
|
||
|
:internal:
|
||
|
"""
|
||
|
assert self.map.redirect_defaults
|
||
|
for r in self.map._rules_by_endpoint[rule.endpoint]:
|
||
|
# every rule that comes after this one, including ourself
|
||
|
# has a lower priority for the defaults. We order the ones
|
||
|
# with the highest priority up for building.
|
||
|
if r is rule:
|
||
|
break
|
||
|
if r.provides_defaults_for(rule) and r.suitable_for(values, method):
|
||
|
values.update(r.defaults) # type: ignore
|
||
|
domain_part, path = r.build(values) # type: ignore
|
||
|
return self.make_redirect_url(path, query_args, domain_part=domain_part)
|
||
|
return None
|
||
|
|
||
|
def encode_query_args(self, query_args: t.Union[t.Mapping[str, t.Any], str]) -> str:
|
||
|
if not isinstance(query_args, str):
|
||
|
return url_encode(query_args, self.map.charset)
|
||
|
return query_args
|
||
|
|
||
|
def make_redirect_url(
|
||
|
self,
|
||
|
path_info: str,
|
||
|
query_args: t.Optional[t.Union[t.Mapping[str, t.Any], str]] = None,
|
||
|
domain_part: t.Optional[str] = None,
|
||
|
) -> str:
|
||
|
"""Creates a redirect URL.
|
||
|
|
||
|
:internal:
|
||
|
"""
|
||
|
if query_args:
|
||
|
suffix = f"?{self.encode_query_args(query_args)}"
|
||
|
else:
|
||
|
suffix = ""
|
||
|
|
||
|
scheme = self.url_scheme or "http"
|
||
|
host = self.get_host(domain_part)
|
||
|
path = posixpath.join(self.script_name.strip("/"), path_info.lstrip("/"))
|
||
|
return f"{scheme}://{host}/{path}{suffix}"
|
||
|
|
||
|
def make_alias_redirect_url(
|
||
|
self,
|
||
|
path: str,
|
||
|
endpoint: str,
|
||
|
values: t.Mapping[str, t.Any],
|
||
|
method: str,
|
||
|
query_args: t.Union[t.Mapping[str, t.Any], str],
|
||
|
) -> str:
|
||
|
"""Internally called to make an alias redirect URL."""
|
||
|
url = self.build(
|
||
|
endpoint, values, method, append_unknown=False, force_external=True
|
||
|
)
|
||
|
if query_args:
|
||
|
url += f"?{self.encode_query_args(query_args)}"
|
||
|
assert url != path, "detected invalid alias setting. No canonical URL found"
|
||
|
return url
|
||
|
|
||
|
def _partial_build(
|
||
|
self,
|
||
|
endpoint: str,
|
||
|
values: t.Mapping[str, t.Any],
|
||
|
method: t.Optional[str],
|
||
|
append_unknown: bool,
|
||
|
) -> t.Optional[t.Tuple[str, str, bool]]:
|
||
|
"""Helper for :meth:`build`. Returns subdomain and path for the
|
||
|
rule that accepts this endpoint, values and method.
|
||
|
|
||
|
:internal:
|
||
|
"""
|
||
|
# in case the method is none, try with the default method first
|
||
|
if method is None:
|
||
|
rv = self._partial_build(
|
||
|
endpoint, values, self.default_method, append_unknown
|
||
|
)
|
||
|
if rv is not None:
|
||
|
return rv
|
||
|
|
||
|
# Default method did not match or a specific method is passed.
|
||
|
# Check all for first match with matching host. If no matching
|
||
|
# host is found, go with first result.
|
||
|
first_match = None
|
||
|
|
||
|
for rule in self.map._rules_by_endpoint.get(endpoint, ()):
|
||
|
if rule.suitable_for(values, method):
|
||
|
build_rv = rule.build(values, append_unknown)
|
||
|
|
||
|
if build_rv is not None:
|
||
|
rv = (build_rv[0], build_rv[1], rule.websocket)
|
||
|
if self.map.host_matching:
|
||
|
if rv[0] == self.server_name:
|
||
|
return rv
|
||
|
elif first_match is None:
|
||
|
first_match = rv
|
||
|
else:
|
||
|
return rv
|
||
|
|
||
|
return first_match
|
||
|
|
||
|
def build(
|
||
|
self,
|
||
|
endpoint: str,
|
||
|
values: t.Optional[t.Mapping[str, t.Any]] = None,
|
||
|
method: t.Optional[str] = None,
|
||
|
force_external: bool = False,
|
||
|
append_unknown: bool = True,
|
||
|
url_scheme: t.Optional[str] = None,
|
||
|
) -> str:
|
||
|
"""Building URLs works pretty much the other way round. Instead of
|
||
|
`match` you call `build` and pass it the endpoint and a dict of
|
||
|
arguments for the placeholders.
|
||
|
|
||
|
The `build` function also accepts an argument called `force_external`
|
||
|
which, if you set it to `True` will force external URLs. Per default
|
||
|
external URLs (include the server name) will only be used if the
|
||
|
target URL is on a different subdomain.
|
||
|
|
||
|
>>> m = Map([
|
||
|
... Rule('/', endpoint='index'),
|
||
|
... Rule('/downloads/', endpoint='downloads/index'),
|
||
|
... Rule('/downloads/<int:id>', endpoint='downloads/show')
|
||
|
... ])
|
||
|
>>> urls = m.bind("example.com", "/")
|
||
|
>>> urls.build("index", {})
|
||
|
'/'
|
||
|
>>> urls.build("downloads/show", {'id': 42})
|
||
|
'/downloads/42'
|
||
|
>>> urls.build("downloads/show", {'id': 42}, force_external=True)
|
||
|
'http://example.com/downloads/42'
|
||
|
|
||
|
Because URLs cannot contain non ASCII data you will always get
|
||
|
bytes back. Non ASCII characters are urlencoded with the
|
||
|
charset defined on the map instance.
|
||
|
|
||
|
Additional values are converted to strings and appended to the URL as
|
||
|
URL querystring parameters:
|
||
|
|
||
|
>>> urls.build("index", {'q': 'My Searchstring'})
|
||
|
'/?q=My+Searchstring'
|
||
|
|
||
|
When processing those additional values, lists are furthermore
|
||
|
interpreted as multiple values (as per
|
||
|
:py:class:`werkzeug.datastructures.MultiDict`):
|
||
|
|
||
|
>>> urls.build("index", {'q': ['a', 'b', 'c']})
|
||
|
'/?q=a&q=b&q=c'
|
||
|
|
||
|
Passing a ``MultiDict`` will also add multiple values:
|
||
|
|
||
|
>>> urls.build("index", MultiDict((('p', 'z'), ('q', 'a'), ('q', 'b'))))
|
||
|
'/?p=z&q=a&q=b'
|
||
|
|
||
|
If a rule does not exist when building a `BuildError` exception is
|
||
|
raised.
|
||
|
|
||
|
The build method accepts an argument called `method` which allows you
|
||
|
to specify the method you want to have an URL built for if you have
|
||
|
different methods for the same endpoint specified.
|
||
|
|
||
|
:param endpoint: the endpoint of the URL to build.
|
||
|
:param values: the values for the URL to build. Unhandled values are
|
||
|
appended to the URL as query parameters.
|
||
|
:param method: the HTTP method for the rule if there are different
|
||
|
URLs for different methods on the same endpoint.
|
||
|
:param force_external: enforce full canonical external URLs. If the URL
|
||
|
scheme is not provided, this will generate
|
||
|
a protocol-relative URL.
|
||
|
:param append_unknown: unknown parameters are appended to the generated
|
||
|
URL as query string argument. Disable this
|
||
|
if you want the builder to ignore those.
|
||
|
:param url_scheme: Scheme to use in place of the bound
|
||
|
:attr:`url_scheme`.
|
||
|
|
||
|
.. versionchanged:: 2.0
|
||
|
Added the ``url_scheme`` parameter.
|
||
|
|
||
|
.. versionadded:: 0.6
|
||
|
Added the ``append_unknown`` parameter.
|
||
|
"""
|
||
|
self.map.update()
|
||
|
|
||
|
if values:
|
||
|
if isinstance(values, MultiDict):
|
||
|
values = {
|
||
|
k: (v[0] if len(v) == 1 else v)
|
||
|
for k, v in dict.items(values)
|
||
|
if len(v) != 0
|
||
|
}
|
||
|
else: # plain dict
|
||
|
values = {k: v for k, v in values.items() if v is not None}
|
||
|
else:
|
||
|
values = {}
|
||
|
|
||
|
rv = self._partial_build(endpoint, values, method, append_unknown)
|
||
|
if rv is None:
|
||
|
raise BuildError(endpoint, values, method, self)
|
||
|
|
||
|
domain_part, path, websocket = rv
|
||
|
host = self.get_host(domain_part)
|
||
|
|
||
|
if url_scheme is None:
|
||
|
url_scheme = self.url_scheme
|
||
|
|
||
|
# Always build WebSocket routes with the scheme (browsers
|
||
|
# require full URLs). If bound to a WebSocket, ensure that HTTP
|
||
|
# routes are built with an HTTP scheme.
|
||
|
secure = url_scheme in {"https", "wss"}
|
||
|
|
||
|
if websocket:
|
||
|
force_external = True
|
||
|
url_scheme = "wss" if secure else "ws"
|
||
|
elif url_scheme:
|
||
|
url_scheme = "https" if secure else "http"
|
||
|
|
||
|
# shortcut this.
|
||
|
if not force_external and (
|
||
|
(self.map.host_matching and host == self.server_name)
|
||
|
or (not self.map.host_matching and domain_part == self.subdomain)
|
||
|
):
|
||
|
return f"{self.script_name.rstrip('/')}/{path.lstrip('/')}"
|
||
|
|
||
|
scheme = f"{url_scheme}:" if url_scheme else ""
|
||
|
return f"{scheme}//{host}{self.script_name[:-1]}/{path.lstrip('/')}"
|