
147 lines
3.9 KiB
Raw Permalink Normal View History

2023-11-23 10:43:30 -05:00
Various functionalities to make easier to work with the libpq.
# Copyright (C) 2020 The Psycopg Team
import os
import sys
import logging
import ctypes.util
from typing import cast, NamedTuple, Optional, Union
from .abc import PGconn, PGresult
from ._enums import ConnStatus, TransactionStatus, PipelineStatus
from .._compat import cache
from .._encodings import pgconn_encoding
logger = logging.getLogger("psycopg.pq")
OK = ConnStatus.OK
class PGnotify(NamedTuple):
relname: bytes
be_pid: int
extra: bytes
class ConninfoOption(NamedTuple):
keyword: bytes
envvar: Optional[bytes]
compiled: Optional[bytes]
val: Optional[bytes]
label: bytes
dispchar: bytes
dispsize: int
class PGresAttDesc(NamedTuple):
name: bytes
tableid: int
columnid: int
format: int
typid: int
typlen: int
atttypmod: int
def find_libpq_full_path() -> Optional[str]:
if sys.platform == "win32":
libname = ctypes.util.find_library("libpq.dll")
elif sys.platform == "darwin":
libname = ctypes.util.find_library("libpq.dylib")
# (hopefully) temporary hack: libpq not in a standard place
# https://github.com/orgs/Homebrew/discussions/3595
# If pg_config is available and agrees, let's use its indications.
if not libname:
import subprocess as sp
libdir = sp.check_output(["pg_config", "--libdir"]).strip().decode()
libname = os.path.join(libdir, "libpq.dylib")
if not os.path.exists(libname):
libname = None
except Exception as ex:
logger.debug("couldn't use pg_config to find libpq: %s", ex)
libname = ctypes.util.find_library("pq")
return libname
def error_message(obj: Union[PGconn, PGresult], encoding: str = "utf8") -> str:
Return an error message from a `PGconn` or `PGresult`.
The return value is a `!str` (unlike pq data which is usually `!bytes`):
use the connection encoding if available, otherwise the `!encoding`
parameter as a fallback for decoding. Don't raise exceptions on decoding
bmsg: bytes
if hasattr(obj, "error_field"):
# obj is a PGresult
obj = cast(PGresult, obj)
bmsg = obj.error_message
# strip severity and whitespaces
if bmsg:
bmsg = bmsg.split(b":", 1)[-1].strip()
elif hasattr(obj, "error_message"):
# obj is a PGconn
if obj.status == OK:
encoding = pgconn_encoding(obj)
bmsg = obj.error_message
# strip severity and whitespaces
if bmsg:
bmsg = bmsg.split(b":", 1)[-1].strip()
raise TypeError(f"PGconn or PGresult expected, got {type(obj).__name__}")
if bmsg:
msg = bmsg.decode(encoding, "replace")
msg = "no details available"
return msg
def connection_summary(pgconn: PGconn) -> str:
Return summary information on a connection.
Useful for __repr__
parts = []
if pgconn.status == OK:
# Put together the [STATUS]
status = TransactionStatus(pgconn.transaction_status).name
if pgconn.pipeline_status:
status += f", pipeline={PipelineStatus(pgconn.pipeline_status).name}"
# Put together the (CONNECTION)
if not pgconn.host.startswith(b"/"):
parts.append(("host", pgconn.host.decode()))
if pgconn.port != b"5432":
parts.append(("port", pgconn.port.decode()))
if pgconn.user != pgconn.db:
parts.append(("user", pgconn.user.decode()))
parts.append(("database", pgconn.db.decode()))
status = ConnStatus(pgconn.status).name
sparts = " ".join("%s=%s" % part for part in parts)
if sparts:
sparts = f" ({sparts})"
return f"[{status}]{sparts}"