"""Persist data to a key-value store.
The store is a file. By default it is in the systemp temporary directory, but
its full path may be specified with the ``IPCTOOLS_STORE`` environment
variable.
"""
import contextlib
import logging
import os
import pathlib
import sqlite3
import tempfile
import threading
import time
from .lib import func, moment
_DB_NAME = os.getenv("IPCTOOLS_STORE") or str(
pathlib.Path(tempfile.gettempdir()) / ".ipctools_store"
)
_lock = threading.Lock()
[docs]
class StoreError(RuntimeError):
"""Raised when a store operation fails."""
@func.once
def _ensure_store(cur: sqlite3.Cursor) -> None:
logging.getLogger(__name__).debug("ensuring the store exists")
cur.execute("CREATE TABLE IF NOT EXISTS store (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
[docs]
def get_value(key: str) -> str | None:
"""Get a value from the store.
Return the value, or None if the key doesn't exist.
Raises a StoreError if the value can't be gotten, e.g., because the store
has not been initialized yet.
"""
with (
contextlib.closing(sqlite3.connect(_DB_NAME, autocommit=False)) as con,
contextlib.closing(con.cursor()) as cur,
_lock,
):
params = (key,)
try:
res = cur.execute("SELECT value FROM store WHERE key = ?", params)
except sqlite3.OperationalError as exc:
msg = f"unable to get the value for {key!r}"
raise StoreError(msg) from exc
ret = res.fetchone()
return ret[0] if ret else None
[docs]
def set_value(key: str, value: str, *, _timeout: float = 5) -> None:
"""Set a value in the store.
If the key already exists, its value will be updated.
Raises a StoreError if the value can't be set, e.g., because another
process is currently setting the value.
"""
with (
contextlib.closing(sqlite3.connect(_DB_NAME, autocommit=False, timeout=_timeout)) as con,
contextlib.closing(con.cursor()) as cur,
_lock,
):
try:
_ensure_store(cur)
cur.execute(
"INSERT INTO store(key, value)"
" VALUES(?, ?)"
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value),
)
con.commit()
except sqlite3.OperationalError as exc:
con.rollback()
msg = f"unable to set value for {key!r} to {value!r}"
raise StoreError(msg) from exc
[docs]
def create_value(key: str, value: str) -> bool:
"""Create a value in the store.
If the key already exists, its value will not be updated.
Returns True if the value was created, or False otherwise.
"""
with (
contextlib.closing(sqlite3.connect(_DB_NAME, autocommit=False)) as con,
contextlib.closing(con.cursor()) as cur,
_lock,
):
try:
_ensure_store(cur)
cur.execute("INSERT INTO store(key, value) VALUES(?, ?)", (key, value))
con.commit()
except (sqlite3.IntegrityError, sqlite3.OperationalError):
con.rollback()
return False
else:
return True
[docs]
def try_get_value(key: str) -> str | None:
"""Try to get a value from the store.
Returns the value, or None if the key doesn't exist, or there was a
StoreError.
"""
try:
res = get_value(key)
except StoreError:
return None
else:
return res
[docs]
def try_set_value(key: str, value: str, *, retries: int = 0, _timeout: float = 5) -> bool:
"""Try to set a value in the store, with optional retries.
Returns True if the value was set. Otherwise False.
"""
try:
set_value(key, value, _timeout=_timeout)
except StoreError:
if retries <= 0:
return False
delay = moment.delay(1, 10, tick_size=0.05)
retries -= 1
logging.getLogger(__name__).debug(
"waiting for %.2fs before retrying; %d retries left", delay, retries
)
time.sleep(delay)
return try_set_value(key, value, retries=retries, _timeout=_timeout)
else:
return True