init
This commit is contained in:
705
.env/lib/python3.10/site-packages/flask/helpers.py
Normal file
705
.env/lib/python3.10/site-packages/flask/helpers.py
Normal file
@ -0,0 +1,705 @@
|
||||
import os
|
||||
import pkgutil
|
||||
import socket
|
||||
import sys
|
||||
import typing as t
|
||||
from datetime import datetime
|
||||
from functools import lru_cache
|
||||
from functools import update_wrapper
|
||||
from threading import RLock
|
||||
|
||||
import werkzeug.utils
|
||||
from werkzeug.exceptions import abort as _wz_abort
|
||||
from werkzeug.utils import redirect as _wz_redirect
|
||||
|
||||
from .globals import _cv_request
|
||||
from .globals import current_app
|
||||
from .globals import request
|
||||
from .globals import request_ctx
|
||||
from .globals import session
|
||||
from .signals import message_flashed
|
||||
|
||||
if t.TYPE_CHECKING: # pragma: no cover
|
||||
from werkzeug.wrappers import Response as BaseResponse
|
||||
from .wrappers import Response
|
||||
import typing_extensions as te
|
||||
|
||||
|
||||
def get_env() -> str:
|
||||
"""Get the environment the app is running in, indicated by the
|
||||
:envvar:`FLASK_ENV` environment variable. The default is
|
||||
``'production'``.
|
||||
|
||||
.. deprecated:: 2.2
|
||||
Will be removed in Flask 2.3.
|
||||
"""
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"'FLASK_ENV' and 'get_env' are deprecated and will be removed"
|
||||
" in Flask 2.3. Use 'FLASK_DEBUG' instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return os.environ.get("FLASK_ENV") or "production"
|
||||
|
||||
|
||||
def get_debug_flag() -> bool:
|
||||
"""Get whether debug mode should be enabled for the app, indicated by the
|
||||
:envvar:`FLASK_DEBUG` environment variable. The default is ``False``.
|
||||
"""
|
||||
val = os.environ.get("FLASK_DEBUG")
|
||||
|
||||
if not val:
|
||||
env = os.environ.get("FLASK_ENV")
|
||||
|
||||
if env is not None:
|
||||
print(
|
||||
"'FLASK_ENV' is deprecated and will not be used in"
|
||||
" Flask 2.3. Use 'FLASK_DEBUG' instead.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return env == "development"
|
||||
|
||||
return False
|
||||
|
||||
return val.lower() not in {"0", "false", "no"}
|
||||
|
||||
|
||||
def get_load_dotenv(default: bool = True) -> bool:
|
||||
"""Get whether the user has disabled loading default dotenv files by
|
||||
setting :envvar:`FLASK_SKIP_DOTENV`. The default is ``True``, load
|
||||
the files.
|
||||
|
||||
:param default: What to return if the env var isn't set.
|
||||
"""
|
||||
val = os.environ.get("FLASK_SKIP_DOTENV")
|
||||
|
||||
if not val:
|
||||
return default
|
||||
|
||||
return val.lower() in ("0", "false", "no")
|
||||
|
||||
|
||||
def stream_with_context(
|
||||
generator_or_function: t.Union[
|
||||
t.Iterator[t.AnyStr], t.Callable[..., t.Iterator[t.AnyStr]]
|
||||
]
|
||||
) -> t.Iterator[t.AnyStr]:
|
||||
"""Request contexts disappear when the response is started on the server.
|
||||
This is done for efficiency reasons and to make it less likely to encounter
|
||||
memory leaks with badly written WSGI middlewares. The downside is that if
|
||||
you are using streamed responses, the generator cannot access request bound
|
||||
information any more.
|
||||
|
||||
This function however can help you keep the context around for longer::
|
||||
|
||||
from flask import stream_with_context, request, Response
|
||||
|
||||
@app.route('/stream')
|
||||
def streamed_response():
|
||||
@stream_with_context
|
||||
def generate():
|
||||
yield 'Hello '
|
||||
yield request.args['name']
|
||||
yield '!'
|
||||
return Response(generate())
|
||||
|
||||
Alternatively it can also be used around a specific generator::
|
||||
|
||||
from flask import stream_with_context, request, Response
|
||||
|
||||
@app.route('/stream')
|
||||
def streamed_response():
|
||||
def generate():
|
||||
yield 'Hello '
|
||||
yield request.args['name']
|
||||
yield '!'
|
||||
return Response(stream_with_context(generate()))
|
||||
|
||||
.. versionadded:: 0.9
|
||||
"""
|
||||
try:
|
||||
gen = iter(generator_or_function) # type: ignore
|
||||
except TypeError:
|
||||
|
||||
def decorator(*args: t.Any, **kwargs: t.Any) -> t.Any:
|
||||
gen = generator_or_function(*args, **kwargs) # type: ignore
|
||||
return stream_with_context(gen)
|
||||
|
||||
return update_wrapper(decorator, generator_or_function) # type: ignore
|
||||
|
||||
def generator() -> t.Generator:
|
||||
ctx = _cv_request.get(None)
|
||||
if ctx is None:
|
||||
raise RuntimeError(
|
||||
"'stream_with_context' can only be used when a request"
|
||||
" context is active, such as in a view function."
|
||||
)
|
||||
with ctx:
|
||||
# Dummy sentinel. Has to be inside the context block or we're
|
||||
# not actually keeping the context around.
|
||||
yield None
|
||||
|
||||
# The try/finally is here so that if someone passes a WSGI level
|
||||
# iterator in we're still running the cleanup logic. Generators
|
||||
# don't need that because they are closed on their destruction
|
||||
# automatically.
|
||||
try:
|
||||
yield from gen
|
||||
finally:
|
||||
if hasattr(gen, "close"):
|
||||
gen.close() # type: ignore
|
||||
|
||||
# The trick is to start the generator. Then the code execution runs until
|
||||
# the first dummy None is yielded at which point the context was already
|
||||
# pushed. This item is discarded. Then when the iteration continues the
|
||||
# real generator is executed.
|
||||
wrapped_g = generator()
|
||||
next(wrapped_g)
|
||||
return wrapped_g
|
||||
|
||||
|
||||
def make_response(*args: t.Any) -> "Response":
|
||||
"""Sometimes it is necessary to set additional headers in a view. Because
|
||||
views do not have to return response objects but can return a value that
|
||||
is converted into a response object by Flask itself, it becomes tricky to
|
||||
add headers to it. This function can be called instead of using a return
|
||||
and you will get a response object which you can use to attach headers.
|
||||
|
||||
If view looked like this and you want to add a new header::
|
||||
|
||||
def index():
|
||||
return render_template('index.html', foo=42)
|
||||
|
||||
You can now do something like this::
|
||||
|
||||
def index():
|
||||
response = make_response(render_template('index.html', foo=42))
|
||||
response.headers['X-Parachutes'] = 'parachutes are cool'
|
||||
return response
|
||||
|
||||
This function accepts the very same arguments you can return from a
|
||||
view function. This for example creates a response with a 404 error
|
||||
code::
|
||||
|
||||
response = make_response(render_template('not_found.html'), 404)
|
||||
|
||||
The other use case of this function is to force the return value of a
|
||||
view function into a response which is helpful with view
|
||||
decorators::
|
||||
|
||||
response = make_response(view_function())
|
||||
response.headers['X-Parachutes'] = 'parachutes are cool'
|
||||
|
||||
Internally this function does the following things:
|
||||
|
||||
- if no arguments are passed, it creates a new response argument
|
||||
- if one argument is passed, :meth:`flask.Flask.make_response`
|
||||
is invoked with it.
|
||||
- if more than one argument is passed, the arguments are passed
|
||||
to the :meth:`flask.Flask.make_response` function as tuple.
|
||||
|
||||
.. versionadded:: 0.6
|
||||
"""
|
||||
if not args:
|
||||
return current_app.response_class()
|
||||
if len(args) == 1:
|
||||
args = args[0]
|
||||
return current_app.make_response(args) # type: ignore
|
||||
|
||||
|
||||
def url_for(
|
||||
endpoint: str,
|
||||
*,
|
||||
_anchor: t.Optional[str] = None,
|
||||
_method: t.Optional[str] = None,
|
||||
_scheme: t.Optional[str] = None,
|
||||
_external: t.Optional[bool] = None,
|
||||
**values: t.Any,
|
||||
) -> str:
|
||||
"""Generate a URL to the given endpoint with the given values.
|
||||
|
||||
This requires an active request or application context, and calls
|
||||
:meth:`current_app.url_for() <flask.Flask.url_for>`. See that method
|
||||
for full documentation.
|
||||
|
||||
:param endpoint: The endpoint name associated with the URL to
|
||||
generate. If this starts with a ``.``, the current blueprint
|
||||
name (if any) will be used.
|
||||
:param _anchor: If given, append this as ``#anchor`` to the URL.
|
||||
:param _method: If given, generate the URL associated with this
|
||||
method for the endpoint.
|
||||
:param _scheme: If given, the URL will have this scheme if it is
|
||||
external.
|
||||
:param _external: If given, prefer the URL to be internal (False) or
|
||||
require it to be external (True). External URLs include the
|
||||
scheme and domain. When not in an active request, URLs are
|
||||
external by default.
|
||||
:param values: Values to use for the variable parts of the URL rule.
|
||||
Unknown keys are appended as query string arguments, like
|
||||
``?a=b&c=d``.
|
||||
|
||||
.. versionchanged:: 2.2
|
||||
Calls ``current_app.url_for``, allowing an app to override the
|
||||
behavior.
|
||||
|
||||
.. versionchanged:: 0.10
|
||||
The ``_scheme`` parameter was added.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
The ``_anchor`` and ``_method`` parameters were added.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
Calls ``app.handle_url_build_error`` on build errors.
|
||||
"""
|
||||
return current_app.url_for(
|
||||
endpoint,
|
||||
_anchor=_anchor,
|
||||
_method=_method,
|
||||
_scheme=_scheme,
|
||||
_external=_external,
|
||||
**values,
|
||||
)
|
||||
|
||||
|
||||
def redirect(
|
||||
location: str, code: int = 302, Response: t.Optional[t.Type["BaseResponse"]] = None
|
||||
) -> "BaseResponse":
|
||||
"""Create a redirect response object.
|
||||
|
||||
If :data:`~flask.current_app` is available, it will use its
|
||||
:meth:`~flask.Flask.redirect` method, otherwise it will use
|
||||
:func:`werkzeug.utils.redirect`.
|
||||
|
||||
:param location: The URL to redirect to.
|
||||
:param code: The status code for the redirect.
|
||||
:param Response: The response class to use. Not used when
|
||||
``current_app`` is active, which uses ``app.response_class``.
|
||||
|
||||
.. versionadded:: 2.2
|
||||
Calls ``current_app.redirect`` if available instead of always
|
||||
using Werkzeug's default ``redirect``.
|
||||
"""
|
||||
if current_app:
|
||||
return current_app.redirect(location, code=code)
|
||||
|
||||
return _wz_redirect(location, code=code, Response=Response)
|
||||
|
||||
|
||||
def abort( # type: ignore[misc]
|
||||
code: t.Union[int, "BaseResponse"], *args: t.Any, **kwargs: t.Any
|
||||
) -> "te.NoReturn":
|
||||
"""Raise an :exc:`~werkzeug.exceptions.HTTPException` for the given
|
||||
status code.
|
||||
|
||||
If :data:`~flask.current_app` is available, it will call its
|
||||
:attr:`~flask.Flask.aborter` object, otherwise it will use
|
||||
:func:`werkzeug.exceptions.abort`.
|
||||
|
||||
:param code: The status code for the exception, which must be
|
||||
registered in ``app.aborter``.
|
||||
:param args: Passed to the exception.
|
||||
:param kwargs: Passed to the exception.
|
||||
|
||||
.. versionadded:: 2.2
|
||||
Calls ``current_app.aborter`` if available instead of always
|
||||
using Werkzeug's default ``abort``.
|
||||
"""
|
||||
if current_app:
|
||||
current_app.aborter(code, *args, **kwargs)
|
||||
|
||||
_wz_abort(code, *args, **kwargs)
|
||||
|
||||
|
||||
def get_template_attribute(template_name: str, attribute: str) -> t.Any:
|
||||
"""Loads a macro (or variable) a template exports. This can be used to
|
||||
invoke a macro from within Python code. If you for example have a
|
||||
template named :file:`_cider.html` with the following contents:
|
||||
|
||||
.. sourcecode:: html+jinja
|
||||
|
||||
{% macro hello(name) %}Hello {{ name }}!{% endmacro %}
|
||||
|
||||
You can access this from Python code like this::
|
||||
|
||||
hello = get_template_attribute('_cider.html', 'hello')
|
||||
return hello('World')
|
||||
|
||||
.. versionadded:: 0.2
|
||||
|
||||
:param template_name: the name of the template
|
||||
:param attribute: the name of the variable of macro to access
|
||||
"""
|
||||
return getattr(current_app.jinja_env.get_template(template_name).module, attribute)
|
||||
|
||||
|
||||
def flash(message: str, category: str = "message") -> None:
|
||||
"""Flashes a message to the next request. In order to remove the
|
||||
flashed message from the session and to display it to the user,
|
||||
the template has to call :func:`get_flashed_messages`.
|
||||
|
||||
.. versionchanged:: 0.3
|
||||
`category` parameter added.
|
||||
|
||||
:param message: the message to be flashed.
|
||||
:param category: the category for the message. The following values
|
||||
are recommended: ``'message'`` for any kind of message,
|
||||
``'error'`` for errors, ``'info'`` for information
|
||||
messages and ``'warning'`` for warnings. However any
|
||||
kind of string can be used as category.
|
||||
"""
|
||||
# Original implementation:
|
||||
#
|
||||
# session.setdefault('_flashes', []).append((category, message))
|
||||
#
|
||||
# This assumed that changes made to mutable structures in the session are
|
||||
# always in sync with the session object, which is not true for session
|
||||
# implementations that use external storage for keeping their keys/values.
|
||||
flashes = session.get("_flashes", [])
|
||||
flashes.append((category, message))
|
||||
session["_flashes"] = flashes
|
||||
message_flashed.send(
|
||||
current_app._get_current_object(), # type: ignore
|
||||
message=message,
|
||||
category=category,
|
||||
)
|
||||
|
||||
|
||||
def get_flashed_messages(
|
||||
with_categories: bool = False, category_filter: t.Iterable[str] = ()
|
||||
) -> t.Union[t.List[str], t.List[t.Tuple[str, str]]]:
|
||||
"""Pulls all flashed messages from the session and returns them.
|
||||
Further calls in the same request to the function will return
|
||||
the same messages. By default just the messages are returned,
|
||||
but when `with_categories` is set to ``True``, the return value will
|
||||
be a list of tuples in the form ``(category, message)`` instead.
|
||||
|
||||
Filter the flashed messages to one or more categories by providing those
|
||||
categories in `category_filter`. This allows rendering categories in
|
||||
separate html blocks. The `with_categories` and `category_filter`
|
||||
arguments are distinct:
|
||||
|
||||
* `with_categories` controls whether categories are returned with message
|
||||
text (``True`` gives a tuple, where ``False`` gives just the message text).
|
||||
* `category_filter` filters the messages down to only those matching the
|
||||
provided categories.
|
||||
|
||||
See :doc:`/patterns/flashing` for examples.
|
||||
|
||||
.. versionchanged:: 0.3
|
||||
`with_categories` parameter added.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
`category_filter` parameter added.
|
||||
|
||||
:param with_categories: set to ``True`` to also receive categories.
|
||||
:param category_filter: filter of categories to limit return values. Only
|
||||
categories in the list will be returned.
|
||||
"""
|
||||
flashes = request_ctx.flashes
|
||||
if flashes is None:
|
||||
flashes = session.pop("_flashes") if "_flashes" in session else []
|
||||
request_ctx.flashes = flashes
|
||||
if category_filter:
|
||||
flashes = list(filter(lambda f: f[0] in category_filter, flashes))
|
||||
if not with_categories:
|
||||
return [x[1] for x in flashes]
|
||||
return flashes
|
||||
|
||||
|
||||
def _prepare_send_file_kwargs(**kwargs: t.Any) -> t.Dict[str, t.Any]:
|
||||
if kwargs.get("max_age") is None:
|
||||
kwargs["max_age"] = current_app.get_send_file_max_age
|
||||
|
||||
kwargs.update(
|
||||
environ=request.environ,
|
||||
use_x_sendfile=current_app.config["USE_X_SENDFILE"],
|
||||
response_class=current_app.response_class,
|
||||
_root_path=current_app.root_path, # type: ignore
|
||||
)
|
||||
return kwargs
|
||||
|
||||
|
||||
def send_file(
|
||||
path_or_file: t.Union[os.PathLike, str, t.BinaryIO],
|
||||
mimetype: t.Optional[str] = None,
|
||||
as_attachment: bool = False,
|
||||
download_name: t.Optional[str] = None,
|
||||
conditional: bool = True,
|
||||
etag: t.Union[bool, str] = True,
|
||||
last_modified: t.Optional[t.Union[datetime, int, float]] = None,
|
||||
max_age: t.Optional[
|
||||
t.Union[int, t.Callable[[t.Optional[str]], t.Optional[int]]]
|
||||
] = None,
|
||||
) -> "Response":
|
||||
"""Send the contents of a file to the client.
|
||||
|
||||
The first argument can be a file path or a file-like object. Paths
|
||||
are preferred in most cases because Werkzeug can manage the file and
|
||||
get extra information from the path. Passing a file-like object
|
||||
requires that the file is opened in binary mode, and is mostly
|
||||
useful when building a file in memory with :class:`io.BytesIO`.
|
||||
|
||||
Never pass file paths provided by a user. The path is assumed to be
|
||||
trusted, so a user could craft a path to access a file you didn't
|
||||
intend. Use :func:`send_from_directory` to safely serve
|
||||
user-requested paths from within a directory.
|
||||
|
||||
If the WSGI server sets a ``file_wrapper`` in ``environ``, it is
|
||||
used, otherwise Werkzeug's built-in wrapper is used. Alternatively,
|
||||
if the HTTP server supports ``X-Sendfile``, configuring Flask with
|
||||
``USE_X_SENDFILE = True`` will tell the server to send the given
|
||||
path, which is much more efficient than reading it in Python.
|
||||
|
||||
:param path_or_file: The path to the file to send, relative to the
|
||||
current working directory if a relative path is given.
|
||||
Alternatively, a file-like object opened in binary mode. Make
|
||||
sure the file pointer is seeked to the start of the data.
|
||||
:param mimetype: The MIME type to send for the file. If not
|
||||
provided, it will try to detect it from the file name.
|
||||
:param as_attachment: Indicate to a browser that it should offer to
|
||||
save the file instead of displaying it.
|
||||
:param download_name: The default name browsers will use when saving
|
||||
the file. Defaults to the passed file name.
|
||||
:param conditional: Enable conditional and range responses based on
|
||||
request headers. Requires passing a file path and ``environ``.
|
||||
:param etag: Calculate an ETag for the file, which requires passing
|
||||
a file path. Can also be a string to use instead.
|
||||
:param last_modified: The last modified time to send for the file,
|
||||
in seconds. If not provided, it will try to detect it from the
|
||||
file path.
|
||||
:param max_age: How long the client should cache the file, in
|
||||
seconds. If set, ``Cache-Control`` will be ``public``, otherwise
|
||||
it will be ``no-cache`` to prefer conditional caching.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
``download_name`` replaces the ``attachment_filename``
|
||||
parameter. If ``as_attachment=False``, it is passed with
|
||||
``Content-Disposition: inline`` instead.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
``max_age`` replaces the ``cache_timeout`` parameter.
|
||||
``conditional`` is enabled and ``max_age`` is not set by
|
||||
default.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
``etag`` replaces the ``add_etags`` parameter. It can be a
|
||||
string to use instead of generating one.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
Passing a file-like object that inherits from
|
||||
:class:`~io.TextIOBase` will raise a :exc:`ValueError` rather
|
||||
than sending an empty file.
|
||||
|
||||
.. versionadded:: 2.0
|
||||
Moved the implementation to Werkzeug. This is now a wrapper to
|
||||
pass some Flask-specific arguments.
|
||||
|
||||
.. versionchanged:: 1.1
|
||||
``filename`` may be a :class:`~os.PathLike` object.
|
||||
|
||||
.. versionchanged:: 1.1
|
||||
Passing a :class:`~io.BytesIO` object supports range requests.
|
||||
|
||||
.. versionchanged:: 1.0.3
|
||||
Filenames are encoded with ASCII instead of Latin-1 for broader
|
||||
compatibility with WSGI servers.
|
||||
|
||||
.. versionchanged:: 1.0
|
||||
UTF-8 filenames as specified in :rfc:`2231` are supported.
|
||||
|
||||
.. versionchanged:: 0.12
|
||||
The filename is no longer automatically inferred from file
|
||||
objects. If you want to use automatic MIME and etag support,
|
||||
pass a filename via ``filename_or_fp`` or
|
||||
``attachment_filename``.
|
||||
|
||||
.. versionchanged:: 0.12
|
||||
``attachment_filename`` is preferred over ``filename`` for MIME
|
||||
detection.
|
||||
|
||||
.. versionchanged:: 0.9
|
||||
``cache_timeout`` defaults to
|
||||
:meth:`Flask.get_send_file_max_age`.
|
||||
|
||||
.. versionchanged:: 0.7
|
||||
MIME guessing and etag support for file-like objects was
|
||||
deprecated because it was unreliable. Pass a filename if you are
|
||||
able to, otherwise attach an etag yourself.
|
||||
|
||||
.. versionchanged:: 0.5
|
||||
The ``add_etags``, ``cache_timeout`` and ``conditional``
|
||||
parameters were added. The default behavior is to add etags.
|
||||
|
||||
.. versionadded:: 0.2
|
||||
"""
|
||||
return werkzeug.utils.send_file( # type: ignore[return-value]
|
||||
**_prepare_send_file_kwargs(
|
||||
path_or_file=path_or_file,
|
||||
environ=request.environ,
|
||||
mimetype=mimetype,
|
||||
as_attachment=as_attachment,
|
||||
download_name=download_name,
|
||||
conditional=conditional,
|
||||
etag=etag,
|
||||
last_modified=last_modified,
|
||||
max_age=max_age,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def send_from_directory(
|
||||
directory: t.Union[os.PathLike, str],
|
||||
path: t.Union[os.PathLike, str],
|
||||
**kwargs: t.Any,
|
||||
) -> "Response":
|
||||
"""Send a file from within a directory using :func:`send_file`.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@app.route("/uploads/<path:name>")
|
||||
def download_file(name):
|
||||
return send_from_directory(
|
||||
app.config['UPLOAD_FOLDER'], name, as_attachment=True
|
||||
)
|
||||
|
||||
This is a secure way to serve files from a folder, such as static
|
||||
files or uploads. Uses :func:`~werkzeug.security.safe_join` to
|
||||
ensure the path coming from the client is not maliciously crafted to
|
||||
point outside the specified directory.
|
||||
|
||||
If the final path does not point to an existing regular file,
|
||||
raises a 404 :exc:`~werkzeug.exceptions.NotFound` error.
|
||||
|
||||
:param directory: The directory that ``path`` must be located under,
|
||||
relative to the current application's root path.
|
||||
:param path: The path to the file to send, relative to
|
||||
``directory``.
|
||||
:param kwargs: Arguments to pass to :func:`send_file`.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
``path`` replaces the ``filename`` parameter.
|
||||
|
||||
.. versionadded:: 2.0
|
||||
Moved the implementation to Werkzeug. This is now a wrapper to
|
||||
pass some Flask-specific arguments.
|
||||
|
||||
.. versionadded:: 0.5
|
||||
"""
|
||||
return werkzeug.utils.send_from_directory( # type: ignore[return-value]
|
||||
directory, path, **_prepare_send_file_kwargs(**kwargs)
|
||||
)
|
||||
|
||||
|
||||
def get_root_path(import_name: str) -> str:
|
||||
"""Find the root path of a package, or the path that contains a
|
||||
module. If it cannot be found, returns the current working
|
||||
directory.
|
||||
|
||||
Not to be confused with the value returned by :func:`find_package`.
|
||||
|
||||
:meta private:
|
||||
"""
|
||||
# Module already imported and has a file attribute. Use that first.
|
||||
mod = sys.modules.get(import_name)
|
||||
|
||||
if mod is not None and hasattr(mod, "__file__") and mod.__file__ is not None:
|
||||
return os.path.dirname(os.path.abspath(mod.__file__))
|
||||
|
||||
# Next attempt: check the loader.
|
||||
loader = pkgutil.get_loader(import_name)
|
||||
|
||||
# Loader does not exist or we're referring to an unloaded main
|
||||
# module or a main module without path (interactive sessions), go
|
||||
# with the current working directory.
|
||||
if loader is None or import_name == "__main__":
|
||||
return os.getcwd()
|
||||
|
||||
if hasattr(loader, "get_filename"):
|
||||
filepath = loader.get_filename(import_name) # type: ignore
|
||||
else:
|
||||
# Fall back to imports.
|
||||
__import__(import_name)
|
||||
mod = sys.modules[import_name]
|
||||
filepath = getattr(mod, "__file__", None)
|
||||
|
||||
# If we don't have a file path it might be because it is a
|
||||
# namespace package. In this case pick the root path from the
|
||||
# first module that is contained in the package.
|
||||
if filepath is None:
|
||||
raise RuntimeError(
|
||||
"No root path can be found for the provided module"
|
||||
f" {import_name!r}. This can happen because the module"
|
||||
" came from an import hook that does not provide file"
|
||||
" name information or because it's a namespace package."
|
||||
" In this case the root path needs to be explicitly"
|
||||
" provided."
|
||||
)
|
||||
|
||||
# filepath is import_name.py for a module, or __init__.py for a package.
|
||||
return os.path.dirname(os.path.abspath(filepath))
|
||||
|
||||
|
||||
class locked_cached_property(werkzeug.utils.cached_property):
|
||||
"""A :func:`property` that is only evaluated once. Like
|
||||
:class:`werkzeug.utils.cached_property` except access uses a lock
|
||||
for thread safety.
|
||||
|
||||
.. versionchanged:: 2.0
|
||||
Inherits from Werkzeug's ``cached_property`` (and ``property``).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
fget: t.Callable[[t.Any], t.Any],
|
||||
name: t.Optional[str] = None,
|
||||
doc: t.Optional[str] = None,
|
||||
) -> None:
|
||||
super().__init__(fget, name=name, doc=doc)
|
||||
self.lock = RLock()
|
||||
|
||||
def __get__(self, obj: object, type: type = None) -> t.Any: # type: ignore
|
||||
if obj is None:
|
||||
return self
|
||||
|
||||
with self.lock:
|
||||
return super().__get__(obj, type=type)
|
||||
|
||||
def __set__(self, obj: object, value: t.Any) -> None:
|
||||
with self.lock:
|
||||
super().__set__(obj, value)
|
||||
|
||||
def __delete__(self, obj: object) -> None:
|
||||
with self.lock:
|
||||
super().__delete__(obj)
|
||||
|
||||
|
||||
def is_ip(value: str) -> bool:
|
||||
"""Determine if the given string is an IP address.
|
||||
|
||||
:param value: value to check
|
||||
:type value: str
|
||||
|
||||
:return: True if string is an IP address
|
||||
:rtype: bool
|
||||
"""
|
||||
for family in (socket.AF_INET, socket.AF_INET6):
|
||||
try:
|
||||
socket.inet_pton(family, value)
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def _split_blueprint_path(name: str) -> t.List[str]:
|
||||
out: t.List[str] = [name]
|
||||
|
||||
if "." in name:
|
||||
out.extend(_split_blueprint_path(name.rpartition(".")[0]))
|
||||
|
||||
return out
|
Reference in New Issue
Block a user