--------------
-:mod:`dbm` is a generic interface to variants of the DBM database ---
-:mod:`dbm.gnu` or :mod:`dbm.ndbm`. If none of these modules is installed, the
+:mod:`dbm` is a generic interface to variants of the DBM database:
+
+* :mod:`dbm.sqlite3`
+* :mod:`dbm.gnu`
+* :mod:`dbm.ndbm`
+
+If none of these modules are installed, the
slow-but-simple implementation in module :mod:`dbm.dumb` will be used. There
is a `third party interface <https://www.jcea.es/programacion/pybsddb.htm>`_ to
the Oracle Berkeley DB.
.. function:: whichdb(filename)
This function attempts to guess which of the several simple database modules
- available --- :mod:`dbm.gnu`, :mod:`dbm.ndbm` or :mod:`dbm.dumb` --- should
- be used to open a given file.
+ available --- :mod:`dbm.sqlite3`, :mod:`dbm.gnu`, :mod:`dbm.ndbm`,
+ or :mod:`dbm.dumb` --- should be used to open a given file.
Return one of the following values:
The individual submodules are described in the following sections.
+:mod:`dbm.sqlite3` --- SQLite backend for dbm
+---------------------------------------------
+
+.. module:: dbm.sqlite3
+ :platform: All
+ :synopsis: SQLite backend for dbm
+
+.. versionadded:: 3.13
+
+**Source code:** :source:`Lib/dbm/sqlite3.py`
+
+--------------
+
+This module uses the standard library :mod:`sqlite3` module to provide an
+SQLite backend for the :mod:`dbm` module.
+The files created by :mod:`dbm.sqlite3` can thus be opened by :mod:`sqlite3`,
+or any other SQLite browser, including the SQLite CLI.
+
+.. function:: open(filename, /, flag="r", mode=0o666)
+
+ Open an SQLite database.
+ The returned object behaves like a :term:`mapping`,
+ implements a :meth:`!close` method,
+ and supports a "closing" context manager via the :keyword:`with` keyword.
+
+ :param filename:
+ The path to the database to be opened.
+ :type filename: :term:`path-like object`
+
+ :param str flag:
+
+ * ``'r'`` (default): |flag_r|
+ * ``'w'``: |flag_w|
+ * ``'c'``: |flag_c|
+ * ``'n'``: |flag_n|
+
+ :param mode:
+ The Unix file access mode of the file (default: octal ``0o666``),
+ used only when the database has to be created.
+
:mod:`dbm.gnu` --- GNU database manager
---------------------------------------
--- /dev/null
+import os
+import sqlite3
+import sys
+from pathlib import Path
+from contextlib import suppress, closing
+from collections.abc import MutableMapping
+
+BUILD_TABLE = """
+ CREATE TABLE IF NOT EXISTS Dict (
+ key BLOB UNIQUE NOT NULL,
+ value BLOB NOT NULL
+ )
+"""
+GET_SIZE = "SELECT COUNT (key) FROM Dict"
+LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)"
+STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))"
+DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)"
+ITER_KEYS = "SELECT key FROM Dict"
+
+
+class error(OSError):
+ pass
+
+
+_ERR_CLOSED = "DBM object has already been closed"
+_ERR_REINIT = "DBM object does not support reinitialization"
+
+
+def _normalize_uri(path):
+ path = Path(path)
+ uri = path.absolute().as_uri()
+ while "//" in uri:
+ uri = uri.replace("//", "/")
+ return uri
+
+
+class _Database(MutableMapping):
+
+ def __init__(self, path, /, *, flag, mode):
+ if hasattr(self, "_cx"):
+ raise error(_ERR_REINIT)
+
+ path = os.fsdecode(path)
+ match flag:
+ case "r":
+ flag = "ro"
+ case "w":
+ flag = "rw"
+ case "c":
+ flag = "rwc"
+ Path(path).touch(mode=mode, exist_ok=True)
+ case "n":
+ flag = "rwc"
+ Path(path).unlink(missing_ok=True)
+ Path(path).touch(mode=mode)
+ case _:
+ raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', "
+ f"not {flag!r}")
+
+ # We use the URI format when opening the database.
+ uri = _normalize_uri(path)
+ uri = f"{uri}?mode={flag}"
+
+ try:
+ self._cx = sqlite3.connect(uri, autocommit=True, uri=True)
+ except sqlite3.Error as exc:
+ raise error(str(exc))
+
+ # This is an optimization only; it's ok if it fails.
+ with suppress(sqlite3.OperationalError):
+ self._cx.execute("PRAGMA journal_mode = wal")
+
+ if flag == "rwc":
+ self._execute(BUILD_TABLE)
+
+ def _execute(self, *args, **kwargs):
+ if not self._cx:
+ raise error(_ERR_CLOSED)
+ try:
+ return closing(self._cx.execute(*args, **kwargs))
+ except sqlite3.Error as exc:
+ raise error(str(exc))
+
+ def __len__(self):
+ with self._execute(GET_SIZE) as cu:
+ row = cu.fetchone()
+ return row[0]
+
+ def __getitem__(self, key):
+ with self._execute(LOOKUP_KEY, (key,)) as cu:
+ row = cu.fetchone()
+ if not row:
+ raise KeyError(key)
+ return row[0]
+
+ def __setitem__(self, key, value):
+ self._execute(STORE_KV, (key, value))
+
+ def __delitem__(self, key):
+ with self._execute(DELETE_KEY, (key,)) as cu:
+ if not cu.rowcount:
+ raise KeyError(key)
+
+ def __iter__(self):
+ try:
+ with self._execute(ITER_KEYS) as cu:
+ for row in cu:
+ yield row[0]
+ except sqlite3.Error as exc:
+ raise error(str(exc))
+
+ def close(self):
+ if self._cx:
+ self._cx.close()
+ self._cx = None
+
+ def keys(self):
+ return list(super().keys())
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+
+def open(filename, /, flag="r", mode=0o666):
+ """Open a dbm.sqlite3 database and return the dbm object.
+
+ The 'filename' parameter is the name of the database file.
+
+ The optional 'flag' parameter can be one of ...:
+ 'r' (default): open an existing database for read only access
+ 'w': open an existing database for read/write access
+ 'c': create a database if it does not exist; open for read/write access
+ 'n': always create a new, empty database; open for read/write access
+
+ The optional 'mode' parameter is the Unix file access mode of the database;
+ only used when creating a new database. Default: 0o666.
+ """
+ return _Database(filename, flag=flag, mode=mode)
--- /dev/null
+import sqlite3
+import sys
+import test.support
+import unittest
+from contextlib import closing
+from functools import partial
+from pathlib import Path
+from test.support import cpython_only, import_helper, os_helper
+
+
+dbm_sqlite3 = import_helper.import_module("dbm.sqlite3")
+from dbm.sqlite3 import _normalize_uri
+
+
+class _SQLiteDbmTests(unittest.TestCase):
+
+ def setUp(self):
+ self.filename = os_helper.TESTFN
+ db = dbm_sqlite3.open(self.filename, "c")
+ db.close()
+
+ def tearDown(self):
+ for suffix in "", "-wal", "-shm":
+ os_helper.unlink(self.filename + suffix)
+
+
+class URI(unittest.TestCase):
+
+ def test_uri_substitutions(self):
+ dataset = (
+ ("/absolute/////b/c", "/absolute/b/c"),
+ ("PRE#MID##END", "PRE%23MID%23%23END"),
+ ("%#?%%#", "%25%23%3F%25%25%23"),
+ )
+ for path, normalized in dataset:
+ with self.subTest(path=path, normalized=normalized):
+ self.assertTrue(_normalize_uri(path).endswith(normalized))
+
+ @unittest.skipUnless(sys.platform == "win32", "requires Windows")
+ def test_uri_windows(self):
+ dataset = (
+ # Relative subdir.
+ (r"2018\January.xlsx",
+ "2018/January.xlsx"),
+ # Absolute with drive letter.
+ (r"C:\Projects\apilibrary\apilibrary.sln",
+ "/C:/Projects/apilibrary/apilibrary.sln"),
+ # Relative with drive letter.
+ (r"C:Projects\apilibrary\apilibrary.sln",
+ "/C:Projects/apilibrary/apilibrary.sln"),
+ )
+ for path, normalized in dataset:
+ with self.subTest(path=path, normalized=normalized):
+ if not Path(path).is_absolute():
+ self.skipTest(f"skipping relative path: {path!r}")
+ self.assertTrue(_normalize_uri(path).endswith(normalized))
+
+
+class ReadOnly(_SQLiteDbmTests):
+
+ def setUp(self):
+ super().setUp()
+ with dbm_sqlite3.open(self.filename, "w") as db:
+ db[b"key1"] = "value1"
+ db[b"key2"] = "value2"
+ self.db = dbm_sqlite3.open(self.filename, "r")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def test_readonly_read(self):
+ self.assertEqual(self.db[b"key1"], b"value1")
+ self.assertEqual(self.db[b"key2"], b"value2")
+
+ def test_readonly_write(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"new"] = "value"
+
+ def test_readonly_delete(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ del self.db[b"key1"]
+
+ def test_readonly_keys(self):
+ self.assertEqual(self.db.keys(), [b"key1", b"key2"])
+
+ def test_readonly_iter(self):
+ self.assertEqual([k for k in self.db], [b"key1", b"key2"])
+
+
+class ReadWrite(_SQLiteDbmTests):
+
+ def setUp(self):
+ super().setUp()
+ self.db = dbm_sqlite3.open(self.filename, "w")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def db_content(self):
+ with closing(sqlite3.connect(self.filename)) as cx:
+ keys = [r[0] for r in cx.execute("SELECT key FROM Dict")]
+ vals = [r[0] for r in cx.execute("SELECT value FROM Dict")]
+ return keys, vals
+
+ def test_readwrite_unique_key(self):
+ self.db["key"] = "value"
+ self.db["key"] = "other"
+ keys, vals = self.db_content()
+ self.assertEqual(keys, [b"key"])
+ self.assertEqual(vals, [b"other"])
+
+ def test_readwrite_delete(self):
+ self.db["key"] = "value"
+ self.db["new"] = "other"
+
+ del self.db[b"new"]
+ keys, vals = self.db_content()
+ self.assertEqual(keys, [b"key"])
+ self.assertEqual(vals, [b"value"])
+
+ del self.db[b"key"]
+ keys, vals = self.db_content()
+ self.assertEqual(keys, [])
+ self.assertEqual(vals, [])
+
+ def test_readwrite_null_key(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[None] = "value"
+
+ def test_readwrite_null_value(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"key"] = None
+
+
+class Misuse(_SQLiteDbmTests):
+
+ def setUp(self):
+ super().setUp()
+ self.db = dbm_sqlite3.open(self.filename, "w")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def test_misuse_double_create(self):
+ self.db["key"] = "value"
+ with dbm_sqlite3.open(self.filename, "c") as db:
+ self.assertEqual(db[b"key"], b"value")
+
+ def test_misuse_double_close(self):
+ self.db.close()
+
+ def test_misuse_invalid_flag(self):
+ regex = "must be.*'r'.*'w'.*'c'.*'n', not 'invalid'"
+ with self.assertRaisesRegex(ValueError, regex):
+ dbm_sqlite3.open(self.filename, flag="invalid")
+
+ def test_misuse_double_delete(self):
+ self.db["key"] = "value"
+ del self.db[b"key"]
+ with self.assertRaises(KeyError):
+ del self.db[b"key"]
+
+ def test_misuse_invalid_key(self):
+ with self.assertRaises(KeyError):
+ self.db[b"key"]
+
+ def test_misuse_iter_close1(self):
+ self.db["1"] = 1
+ it = iter(self.db)
+ self.db.close()
+ with self.assertRaises(dbm_sqlite3.error):
+ next(it)
+
+ def test_misuse_iter_close2(self):
+ self.db["1"] = 1
+ self.db["2"] = 2
+ it = iter(self.db)
+ next(it)
+ self.db.close()
+ with self.assertRaises(dbm_sqlite3.error):
+ next(it)
+
+ def test_misuse_use_after_close(self):
+ self.db.close()
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"read"]
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db[b"write"] = "value"
+ with self.assertRaises(dbm_sqlite3.error):
+ del self.db[b"del"]
+ with self.assertRaises(dbm_sqlite3.error):
+ len(self.db)
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db.keys()
+
+ def test_misuse_reinit(self):
+ with self.assertRaises(dbm_sqlite3.error):
+ self.db.__init__("new.db", flag="n", mode=0o666)
+
+ def test_misuse_empty_filename(self):
+ for flag in "r", "w", "c", "n":
+ with self.assertRaises(dbm_sqlite3.error):
+ db = dbm_sqlite3.open("", flag="c")
+
+
+class DataTypes(_SQLiteDbmTests):
+
+ dataset = (
+ # (raw, coerced)
+ (42, b"42"),
+ (3.14, b"3.14"),
+ ("string", b"string"),
+ (b"bytes", b"bytes"),
+ )
+
+ def setUp(self):
+ super().setUp()
+ self.db = dbm_sqlite3.open(self.filename, "w")
+
+ def tearDown(self):
+ self.db.close()
+ super().tearDown()
+
+ def test_datatypes_values(self):
+ for raw, coerced in self.dataset:
+ with self.subTest(raw=raw, coerced=coerced):
+ self.db["key"] = raw
+ self.assertEqual(self.db[b"key"], coerced)
+
+ def test_datatypes_keys(self):
+ for raw, coerced in self.dataset:
+ with self.subTest(raw=raw, coerced=coerced):
+ self.db[raw] = "value"
+ self.assertEqual(self.db[coerced], b"value")
+ # Raw keys are silently coerced to bytes.
+ self.assertEqual(self.db[raw], b"value")
+ del self.db[raw]
+
+ def test_datatypes_replace_coerced(self):
+ self.db["10"] = "value"
+ self.db[b"10"] = "value"
+ self.db[10] = "value"
+ self.assertEqual(self.db.keys(), [b"10"])
+
+
+class CorruptDatabase(_SQLiteDbmTests):
+ """Verify that database exceptions are raised as dbm.sqlite3.error."""
+
+ def setUp(self):
+ super().setUp()
+ with closing(sqlite3.connect(self.filename)) as cx:
+ with cx:
+ cx.execute("DROP TABLE IF EXISTS Dict")
+ cx.execute("CREATE TABLE Dict (invalid_schema)")
+
+ def check(self, flag, fn, should_succeed=False):
+ with closing(dbm_sqlite3.open(self.filename, flag)) as db:
+ with self.assertRaises(dbm_sqlite3.error):
+ fn(db)
+
+ @staticmethod
+ def read(db):
+ return db["key"]
+
+ @staticmethod
+ def write(db):
+ db["key"] = "value"
+
+ @staticmethod
+ def iter(db):
+ next(iter(db))
+
+ @staticmethod
+ def keys(db):
+ db.keys()
+
+ @staticmethod
+ def del_(db):
+ del db["key"]
+
+ @staticmethod
+ def len_(db):
+ len(db)
+
+ def test_corrupt_readwrite(self):
+ for flag in "r", "w", "c":
+ with self.subTest(flag=flag):
+ check = partial(self.check, flag=flag)
+ check(fn=self.read)
+ check(fn=self.write)
+ check(fn=self.iter)
+ check(fn=self.keys)
+ check(fn=self.del_)
+ check(fn=self.len_)
+
+ def test_corrupt_force_new(self):
+ with closing(dbm_sqlite3.open(self.filename, "n")) as db:
+ db["foo"] = "write"
+ _ = db[b"foo"]
+ next(iter(db))
+ del db[b"foo"]
+
+
+if __name__ == "__main__":
+ unittest.main()