]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-77617: Add sqlite3 command-line interface (#95026)
authorErlend Egeberg Aasland <erlend.aasland@protonmail.com>
Mon, 1 Aug 2022 10:25:16 +0000 (12:25 +0200)
committerGitHub <noreply@github.com>
Mon, 1 Aug 2022 10:25:16 +0000 (12:25 +0200)
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
Doc/library/sqlite3.rst
Doc/whatsnew/3.12.rst
Lib/sqlite3/__main__.py [new file with mode: 0644]
Lib/test/test_sqlite3/test_cli.py [new file with mode: 0644]
Misc/NEWS.d/next/Library/2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst [new file with mode: 0644]

index 35eb404513b4d103c5d8dcf8340e26c89f1a212d..28bd7ce900b53cf63722566d2d8ef491b23f5ce9 100644 (file)
@@ -1442,6 +1442,26 @@ and you can let the ``sqlite3`` module convert SQLite types to
 Python types via :ref:`converters <sqlite3-converters>`.
 
 
+.. _sqlite3-cli:
+
+Command-line interface
+^^^^^^^^^^^^^^^^^^^^^^
+
+The ``sqlite3`` module can be invoked as a script
+in order to provide a simple SQLite shell.
+Type ``.quit`` or CTRL-D to exit the shell.
+
+.. program:: python -m sqlite3 [-h] [-v] [filename] [sql]
+
+.. option:: -h, --help
+    Print CLI help.
+
+.. option:: -v, --version
+    Print underlying SQLite library version.
+
+.. versionadded:: 3.12
+
+
 .. _sqlite3-howtos:
 
 How-to guides
index 0c53bc0c1111d73bcb6a6fca72713411527e2841..67396f8e02280bfe0893f2b434e1895b63d9390b 100644 (file)
@@ -112,6 +112,13 @@ os
   (Contributed by Kumar Aditya in :gh:`93312`.)
 
 
+sqlite3
+-------
+
+* Add a :ref:`command-line interface <sqlite3-cli>`.
+  (Contributed by Erlend E. Aasland in :gh:`77617`.)
+
+
 Optimizations
 =============
 
diff --git a/Lib/sqlite3/__main__.py b/Lib/sqlite3/__main__.py
new file mode 100644 (file)
index 0000000..c62fad8
--- /dev/null
@@ -0,0 +1,97 @@
+import sqlite3
+import sys
+
+from argparse import ArgumentParser
+from code import InteractiveConsole
+from textwrap import dedent
+
+
+def execute(c, sql, suppress_errors=True):
+    try:
+        for row in c.execute(sql):
+            print(row)
+    except sqlite3.Error as e:
+        tp = type(e).__name__
+        try:
+            print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
+        except AttributeError:
+            print(f"{tp}: {e}", file=sys.stderr)
+        if not suppress_errors:
+            sys.exit(1)
+
+
+class SqliteInteractiveConsole(InteractiveConsole):
+
+    def __init__(self, connection):
+        super().__init__()
+        self._con = connection
+        self._cur = connection.cursor()
+
+    def runsource(self, source, filename="<input>", symbol="single"):
+        match source:
+            case ".version":
+                print(f"{sqlite3.sqlite_version}")
+            case ".help":
+                print("Enter SQL code and press enter.")
+            case ".quit":
+                sys.exit(0)
+            case _:
+                if not sqlite3.complete_statement(source):
+                    return True
+                execute(self._cur, source)
+        return False
+
+
+def main():
+    parser = ArgumentParser(
+        description="Python sqlite3 CLI",
+        prog="python -m sqlite3",
+    )
+    parser.add_argument(
+        "filename", type=str, default=":memory:", nargs="?",
+        help=(
+            "SQLite database to open (defaults to ':memory:'). "
+            "A new database is created if the file does not previously exist."
+        ),
+    )
+    parser.add_argument(
+        "sql", type=str, nargs="?",
+        help=(
+            "An SQL query to execute. "
+            "Any returned rows are printed to stdout."
+        ),
+    )
+    parser.add_argument(
+        "-v", "--version", action="version",
+        version=f"SQLite version {sqlite3.sqlite_version}",
+        help="Print underlying SQLite library version",
+    )
+    args = parser.parse_args()
+
+    if args.filename == ":memory:":
+        db_name = "a transient in-memory database"
+    else:
+        db_name = repr(args.filename)
+
+    banner = dedent(f"""
+        sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
+        Connected to {db_name}
+
+        Each command will be run using execute() on the cursor.
+        Type ".help" for more information; type ".quit" or CTRL-D to quit.
+    """).strip()
+    sys.ps1 = "sqlite> "
+    sys.ps2 = "    ... "
+
+    con = sqlite3.connect(args.filename, isolation_level=None)
+    try:
+        if args.sql:
+            execute(con, args.sql, suppress_errors=False)
+        else:
+            console = SqliteInteractiveConsole(con)
+            console.interact(banner, exitmsg="")
+    finally:
+        con.close()
+
+
+main()
diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py
new file mode 100644 (file)
index 0000000..d374f8e
--- /dev/null
@@ -0,0 +1,155 @@
+"""sqlite3 CLI tests."""
+
+import sqlite3 as sqlite
+import subprocess
+import sys
+import unittest
+
+from test.support import SHORT_TIMEOUT, requires_subprocess
+from test.support.os_helper import TESTFN, unlink
+
+
+@requires_subprocess()
+class CommandLineInterface(unittest.TestCase):
+
+    def _do_test(self, *args, expect_success=True):
+        with subprocess.Popen(
+            [sys.executable, "-Xutf8", "-m", "sqlite3", *args],
+            encoding="utf-8",
+            bufsize=0,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+        ) as proc:
+            proc.wait()
+            if expect_success == bool(proc.returncode):
+                self.fail("".join(proc.stderr))
+            stdout = proc.stdout.read()
+            stderr = proc.stderr.read()
+            if expect_success:
+                self.assertEqual(stderr, "")
+            else:
+                self.assertEqual(stdout, "")
+            return stdout, stderr
+
+    def expect_success(self, *args):
+        out, _ = self._do_test(*args)
+        return out
+
+    def expect_failure(self, *args):
+        _, err = self._do_test(*args, expect_success=False)
+        return err
+
+    def test_cli_help(self):
+        out = self.expect_success("-h")
+        self.assertIn("usage: python -m sqlite3", out)
+
+    def test_cli_version(self):
+        out = self.expect_success("-v")
+        self.assertIn(sqlite.sqlite_version, out)
+
+    def test_cli_execute_sql(self):
+        out = self.expect_success(":memory:", "select 1")
+        self.assertIn("(1,)", out)
+
+    def test_cli_execute_too_much_sql(self):
+        stderr = self.expect_failure(":memory:", "select 1; select 2")
+        err = "ProgrammingError: You can only execute one statement at a time"
+        self.assertIn(err, stderr)
+
+    def test_cli_execute_incomplete_sql(self):
+        stderr = self.expect_failure(":memory:", "sel")
+        self.assertIn("OperationalError (SQLITE_ERROR)", stderr)
+
+    def test_cli_on_disk_db(self):
+        self.addCleanup(unlink, TESTFN)
+        out = self.expect_success(TESTFN, "create table t(t)")
+        self.assertEqual(out, "")
+        out = self.expect_success(TESTFN, "select count(t) from t")
+        self.assertIn("(0,)", out)
+
+
+@requires_subprocess()
+class InteractiveSession(unittest.TestCase):
+    TIMEOUT = SHORT_TIMEOUT / 10.
+    MEMORY_DB_MSG = "Connected to a transient in-memory database"
+    PS1 = "sqlite> "
+    PS2 = "... "
+
+    def start_cli(self, *args):
+        return subprocess.Popen(
+            [sys.executable, "-Xutf8", "-m", "sqlite3", *args],
+            encoding="utf-8",
+            bufsize=0,
+            stdin=subprocess.PIPE,
+            # Note: the banner is printed to stderr, the prompt to stdout.
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+        )
+
+    def expect_success(self, proc):
+        proc.wait()
+        if proc.returncode:
+            self.fail("".join(proc.stderr))
+
+    def test_interact(self):
+        with self.start_cli() as proc:
+            out, err = proc.communicate(timeout=self.TIMEOUT)
+            self.assertIn(self.MEMORY_DB_MSG, err)
+            self.assertIn(self.PS1, out)
+            self.expect_success(proc)
+
+    def test_interact_quit(self):
+        with self.start_cli() as proc:
+            out, err = proc.communicate(input=".quit", timeout=self.TIMEOUT)
+            self.assertIn(self.MEMORY_DB_MSG, err)
+            self.assertIn(self.PS1, out)
+            self.expect_success(proc)
+
+    def test_interact_version(self):
+        with self.start_cli() as proc:
+            out, err = proc.communicate(input=".version", timeout=self.TIMEOUT)
+            self.assertIn(self.MEMORY_DB_MSG, err)
+            self.assertIn(sqlite.sqlite_version, out)
+            self.expect_success(proc)
+
+    def test_interact_valid_sql(self):
+        with self.start_cli() as proc:
+            out, err = proc.communicate(input="select 1;",
+                                        timeout=self.TIMEOUT)
+            self.assertIn(self.MEMORY_DB_MSG, err)
+            self.assertIn("(1,)", out)
+            self.expect_success(proc)
+
+    def test_interact_valid_multiline_sql(self):
+        with self.start_cli() as proc:
+            out, err = proc.communicate(input="select 1\n;",
+                                        timeout=self.TIMEOUT)
+            self.assertIn(self.MEMORY_DB_MSG, err)
+            self.assertIn(self.PS2, out)
+            self.assertIn("(1,)", out)
+            self.expect_success(proc)
+
+    def test_interact_invalid_sql(self):
+        with self.start_cli() as proc:
+            out, err = proc.communicate(input="sel;", timeout=self.TIMEOUT)
+            self.assertIn(self.MEMORY_DB_MSG, err)
+            self.assertIn("OperationalError (SQLITE_ERROR)", err)
+            self.expect_success(proc)
+
+    def test_interact_on_disk_file(self):
+        self.addCleanup(unlink, TESTFN)
+        with self.start_cli(TESTFN) as proc:
+            out, err = proc.communicate(input="create table t(t);",
+                                        timeout=self.TIMEOUT)
+            self.assertIn(TESTFN, err)
+            self.assertIn(self.PS1, out)
+            self.expect_success(proc)
+        with self.start_cli(TESTFN, "select count(t) from t") as proc:
+            out = proc.stdout.read()
+            err = proc.stderr.read()
+            self.assertIn("(0,)", out)
+            self.expect_success(proc)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst b/Misc/NEWS.d/next/Library/2022-07-20-00-23-58.gh-issue-77617.XGaqSQ.rst
new file mode 100644 (file)
index 0000000..1cbaa7d
--- /dev/null
@@ -0,0 +1,2 @@
+Add :mod:`sqlite3` :ref:`command-line interface <sqlite3-cli>`.
+Patch by Erlend Aasland.