Source code for ipctools.store

"""Persist data to a key-value store.

The store is a file. By default it is in the systemp temporary directory, but
its full path may be specified with the ``IPCTOOLS_STORE`` environment
variable.
"""

import contextlib
import logging
import os
import pathlib
import sqlite3
import tempfile
import threading
import time

from .lib import func, moment

_DB_NAME = os.getenv("IPCTOOLS_STORE") or str(
    pathlib.Path(tempfile.gettempdir()) / ".ipctools_store"
)
_lock = threading.Lock()


[docs] class StoreError(RuntimeError): """Raised when a store operation fails."""
@func.once def _ensure_store(cur: sqlite3.Cursor) -> None: logging.getLogger(__name__).debug("ensuring the store exists") cur.execute("CREATE TABLE IF NOT EXISTS store (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
[docs] def get_value(key: str) -> str | None: """Get a value from the store. Return the value, or None if the key doesn't exist. Raises a StoreError if the value can't be gotten, e.g., because the store has not been initialized yet. """ with ( contextlib.closing(sqlite3.connect(_DB_NAME, autocommit=False)) as con, contextlib.closing(con.cursor()) as cur, _lock, ): params = (key,) try: res = cur.execute("SELECT value FROM store WHERE key = ?", params) except sqlite3.OperationalError as exc: msg = f"unable to get the value for {key!r}" raise StoreError(msg) from exc ret = res.fetchone() return ret[0] if ret else None
[docs] def set_value(key: str, value: str, *, _timeout: float = 5) -> None: """Set a value in the store. If the key already exists, its value will be updated. Raises a StoreError if the value can't be set, e.g., because another process is currently setting the value. """ with ( contextlib.closing(sqlite3.connect(_DB_NAME, autocommit=False, timeout=_timeout)) as con, contextlib.closing(con.cursor()) as cur, _lock, ): try: _ensure_store(cur) cur.execute( "INSERT INTO store(key, value)" " VALUES(?, ?)" "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, value), ) con.commit() except sqlite3.OperationalError as exc: con.rollback() msg = f"unable to set value for {key!r} to {value!r}" raise StoreError(msg) from exc
[docs] def create_value(key: str, value: str) -> bool: """Create a value in the store. If the key already exists, its value will not be updated. Returns True if the value was created, or False otherwise. """ with ( contextlib.closing(sqlite3.connect(_DB_NAME, autocommit=False)) as con, contextlib.closing(con.cursor()) as cur, _lock, ): try: _ensure_store(cur) cur.execute("INSERT INTO store(key, value) VALUES(?, ?)", (key, value)) con.commit() except (sqlite3.IntegrityError, sqlite3.OperationalError): con.rollback() return False else: return True
[docs] def try_get_value(key: str) -> str | None: """Try to get a value from the store. Returns the value, or None if the key doesn't exist, or there was a StoreError. """ try: res = get_value(key) except StoreError: return None else: return res
[docs] def try_set_value(key: str, value: str, *, retries: int = 0, _timeout: float = 5) -> bool: """Try to set a value in the store, with optional retries. Returns True if the value was set. Otherwise False. """ try: set_value(key, value, _timeout=_timeout) except StoreError: if retries <= 0: return False delay = moment.delay(1, 10, tick_size=0.05) retries -= 1 logging.getLogger(__name__).debug( "waiting for %.2fs before retrying; %d retries left", delay, retries ) time.sleep(delay) return try_set_value(key, value, retries=retries, _timeout=_timeout) else: return True