from __future__ import annotations
from contextlib import contextmanager
from pathlib import Path
from typing import Union
import warnings
from .config import load_config
from .engines import create_engine
__all__ = [
"Client",
]
[docs]class Client:
"""
Creates and manages database connections (currently
:py:class:`psycopg.Connection` or
:py:class:`sqlite3.Connection`).
For simple queries, use the :py:meth:`execute` method::
client = Client(...)
client.execute(...)
Use a client as context manager to obtain a database
connection object::
client = Client(...)
with client as conn
with conn.cursor() as cursor:
...
If all you need is a cursor, you can obtain one directly
from a client with the :py:meth:`cursor` method::
client = Client(...)
with client.cursor() as cursor:
...
Connection parameters are given as ``kwargs``.
Use the ``engine`` parameter to ``"postgres"`` (default) or ``sqlite``.
For SQLite, only ``dbpath`` is relevant.
It defaults to ``sqltrack.db``.
For Postgres, common options are:
* ``user``: defaults to USER env var
* ``dbname``: defaults to ``user``
* ``host``
* ``schema``: a shorthand for setting the ``search_path`` option.
For the full list of available parameters, see:
https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
Parameters passed from Python take priority,
but they may also be passed as environment variables
``SQLTRACK_DSN_<PARAM>`` (e.g., ``SQLTRACK_DSN_USER``),
or loaded from a config file, by default ``./sqltrack.conf``.
SQLTrack classes and functions will connect to the database as required.
Nested contexts (:keyword:`with` blocks) reuse the same connection
(reentrant), so they can be used to avoid connecting to the
database multiple times over a short period.
The connection is closed and any uncomitted changes
are comitted when the outermost :keyword:`with` block ends.
E.g., the following snippet will connect only once::
def do_queries(client, ...):
with client.cursor() as cursor:
cursor.execute(...)
...
client = Client(...)
with client:
do_queries(client, ...)
do_queries(client, ...)
do_queries(client, ...)
One caveat of this approach is that all changes in a stack of
contexts implicitly happen within the same transaction.
All pending changes will be rolled back if an exception is raised.
You can avoid this by periodically calling :py:meth:`commit`.
Parameters:
config: Config dict or path to config file,
defaults to ``SQLTRACK_CONFIG_PATH`` environment variable,
and finally ``./sqltrack.conf``
kwargs: Connection parameters
"""
def __init__(self, config: Union[dict, str, Path, None] = None, **kwargs):
if not isinstance(config, dict):
config = load_config(config, **kwargs)
self.engine = create_engine(config)
self._conn = None
self._conn_level = 0
def __enter__(self) -> DBAPIConnection:
if self._conn_level == 0:
if self._conn is not None:
raise RuntimeError("BUG in __enter__: self._conn_level == 0, but self._conn is not None")
self._conn = self.engine.connect()
self._conn_level += 1
return self._conn
def __exit__(self, exc_type, exc_value, traceback):
self._conn_level -= 1
# make sure Client is in expected
if self._conn_level > 0:
return
elif self._conn_level < 0:
raise RuntimeError("BUG in __exit__: self._conn_level < 0")
elif self._conn is None:
raise RuntimeError("BUG in __exit__: self._conn_level == 0, but self._conn is None")
# adapted from psycopg.Connection.__exit__
# rollback in case of exception, commit otherwise
if exc_type:
# try to rollback, but if there are problems (connection in a bad
# state) just warn without clobbering the exception bubbling up.
try:
self._conn.rollback()
except Exception as e:
warnings.warning("error ignored in rollback on %s: %s", self, e)
else:
self._conn.commit()
self._conn.close()
self._conn = None
[docs] def execute(self, sql, parameters=()):
"""
Convenience function that connects to the DB, if necessary,
and executes the given query with optional parameters.
"""
with self as conn:
conn.execute(sql, parameters)
[docs] def executemany(self, sql, seq_of_parameters):
"""
Convenience function that connects to the DB, if necessary,
and calls executemany with the given sequence of parameters.
"""
with self as conn:
conn.executemany(sql, seq_of_parameters)
[docs] def executescript(self, sql_script):
"""
Convenience function that connects to the DB, if necessary,
and executes the given script.
"""
with self as conn, conn.cursor() as cursor:
cursor.executescript(sql_script)
[docs] def commit(self):
"""
Convenience function to call commit on the DB connection.
Raises :py:class:`RuntimeError` when not connected.
"""
if self._conn is None:
raise RuntimeError("not connected")
self._conn.commit()
[docs] def rollback(self):
"""
Convenience function to call rollback on the DB connection.
Raises :py:class:`RuntimeError` when not connected.
"""
if self._conn is None:
raise RuntimeError("not connected")
self._conn.rollback()
[docs] @contextmanager
def cursor(self) -> DBAPICursor:
"""
Connect to the DB and return a cursor.
Use in with statement::
with client.cursor() as cursor:
... cursor things ...
The connection is closed and any changes comitted
when the with block ends.
"""
with self as conn:
cur = conn.cursor()
try:
yield cur
finally:
cur.close()