Source code for sqltrack.client

from __future__ import annotations

from contextlib import contextmanager
from pathlib import Path
from typing import Union
import warnings

from .config import load_config
from .engines import create_engine


__all__ = [
    "Client",
]


[docs]class Client: """ Creates and manages database connections (currently :py:class:`psycopg.Connection` or :py:class:`sqlite3.Connection`). For simple queries, use the :py:meth:`execute` method:: client = Client(...) client.execute(...) Use a client as context manager to obtain a database connection object:: client = Client(...) with client as conn with conn.cursor() as cursor: ... If all you need is a cursor, you can obtain one directly from a client with the :py:meth:`cursor` method:: client = Client(...) with client.cursor() as cursor: ... Connection parameters are given as ``kwargs``. Use the ``engine`` parameter to ``"postgres"`` (default) or ``sqlite``. For SQLite, only ``dbpath`` is relevant. It defaults to ``sqltrack.db``. For Postgres, common options are: * ``user``: defaults to USER env var * ``dbname``: defaults to ``user`` * ``host`` * ``schema``: a shorthand for setting the ``search_path`` option. For the full list of available parameters, see: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS Parameters passed from Python take priority, but they may also be passed as environment variables ``SQLTRACK_DSN_<PARAM>`` (e.g., ``SQLTRACK_DSN_USER``), or loaded from a config file, by default ``./sqltrack.conf``. SQLTrack classes and functions will connect to the database as required. Nested contexts (:keyword:`with` blocks) reuse the same connection (reentrant), so they can be used to avoid connecting to the database multiple times over a short period. The connection is closed and any uncomitted changes are comitted when the outermost :keyword:`with` block ends. E.g., the following snippet will connect only once:: def do_queries(client, ...): with client.cursor() as cursor: cursor.execute(...) ... client = Client(...) with client: do_queries(client, ...) do_queries(client, ...) do_queries(client, ...) One caveat of this approach is that all changes in a stack of contexts implicitly happen within the same transaction. All pending changes will be rolled back if an exception is raised. You can avoid this by periodically calling :py:meth:`commit`. Parameters: config: Config dict or path to config file, defaults to ``SQLTRACK_CONFIG_PATH`` environment variable, and finally ``./sqltrack.conf`` kwargs: Connection parameters """ def __init__(self, config: Union[dict, str, Path, None] = None, **kwargs): if not isinstance(config, dict): config = load_config(config, **kwargs) self.engine = create_engine(config) self._conn = None self._conn_level = 0 def __enter__(self) -> DBAPIConnection: if self._conn_level == 0: if self._conn is not None: raise RuntimeError("BUG in __enter__: self._conn_level == 0, but self._conn is not None") self._conn = self.engine.connect() self._conn_level += 1 return self._conn def __exit__(self, exc_type, exc_value, traceback): self._conn_level -= 1 # make sure Client is in expected if self._conn_level > 0: return elif self._conn_level < 0: raise RuntimeError("BUG in __exit__: self._conn_level < 0") elif self._conn is None: raise RuntimeError("BUG in __exit__: self._conn_level == 0, but self._conn is None") # adapted from psycopg.Connection.__exit__ # rollback in case of exception, commit otherwise if exc_type: # try to rollback, but if there are problems (connection in a bad # state) just warn without clobbering the exception bubbling up. try: self._conn.rollback() except Exception as e: warnings.warning("error ignored in rollback on %s: %s", self, e) else: self._conn.commit() self._conn.close() self._conn = None
[docs] def execute(self, sql, parameters=()): """ Convenience function that connects to the DB, if necessary, and executes the given query with optional parameters. """ with self as conn: conn.execute(sql, parameters)
[docs] def executemany(self, sql, seq_of_parameters): """ Convenience function that connects to the DB, if necessary, and calls executemany with the given sequence of parameters. """ with self as conn: conn.executemany(sql, seq_of_parameters)
[docs] def executescript(self, sql_script): """ Convenience function that connects to the DB, if necessary, and executes the given script. """ with self as conn, conn.cursor() as cursor: cursor.executescript(sql_script)
[docs] def commit(self): """ Convenience function to call commit on the DB connection. Raises :py:class:`RuntimeError` when not connected. """ if self._conn is None: raise RuntimeError("not connected") self._conn.commit()
[docs] def rollback(self): """ Convenience function to call rollback on the DB connection. Raises :py:class:`RuntimeError` when not connected. """ if self._conn is None: raise RuntimeError("not connected") self._conn.rollback()
[docs] @contextmanager def cursor(self) -> DBAPICursor: """ Connect to the DB and return a cursor. Use in with statement:: with client.cursor() as cursor: ... cursor things ... The connection is closed and any changes comitted when the with block ends. """ with self as conn: cur = conn.cursor() try: yield cur finally: cur.close()