--- /dev/null
+import sqlite3
+import sys
+
+from argparse import ArgumentParser
+from code import InteractiveConsole
+from textwrap import dedent
+
+
+def execute(c, sql, suppress_errors=True):
+ try:
+ for row in c.execute(sql):
+ print(row)
+ except sqlite3.Error as e:
+ tp = type(e).__name__
+ try:
+ print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
+ except AttributeError:
+ print(f"{tp}: {e}", file=sys.stderr)
+ if not suppress_errors:
+ sys.exit(1)
+
+
+class SqliteInteractiveConsole(InteractiveConsole):
+
+ def __init__(self, connection):
+ super().__init__()
+ self._con = connection
+ self._cur = connection.cursor()
+
+ def runsource(self, source, filename="<input>", symbol="single"):
+ match source:
+ case ".version":
+ print(f"{sqlite3.sqlite_version}")
+ case ".help":
+ print("Enter SQL code and press enter.")
+ case ".quit":
+ sys.exit(0)
+ case _:
+ if not sqlite3.complete_statement(source):
+ return True
+ execute(self._cur, source)
+ return False
+
+
+def main():
+ parser = ArgumentParser(
+ description="Python sqlite3 CLI",
+ prog="python -m sqlite3",
+ )
+ parser.add_argument(
+ "filename", type=str, default=":memory:", nargs="?",
+ help=(
+ "SQLite database to open (defaults to ':memory:'). "
+ "A new database is created if the file does not previously exist."
+ ),
+ )
+ parser.add_argument(
+ "sql", type=str, nargs="?",
+ help=(
+ "An SQL query to execute. "
+ "Any returned rows are printed to stdout."
+ ),
+ )
+ parser.add_argument(
+ "-v", "--version", action="version",
+ version=f"SQLite version {sqlite3.sqlite_version}",
+ help="Print underlying SQLite library version",
+ )
+ args = parser.parse_args()
+
+ if args.filename == ":memory:":
+ db_name = "a transient in-memory database"
+ else:
+ db_name = repr(args.filename)
+
+ banner = dedent(f"""
+ sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
+ Connected to {db_name}
+
+ Each command will be run using execute() on the cursor.
+ Type ".help" for more information; type ".quit" or CTRL-D to quit.
+ """).strip()
+ sys.ps1 = "sqlite> "
+ sys.ps2 = " ... "
+
+ con = sqlite3.connect(args.filename, isolation_level=None)
+ try:
+ if args.sql:
+ execute(con, args.sql, suppress_errors=False)
+ else:
+ console = SqliteInteractiveConsole(con)
+ console.interact(banner, exitmsg="")
+ finally:
+ con.close()
+
+
+main()
--- /dev/null
+"""sqlite3 CLI tests."""
+
+import sqlite3 as sqlite
+import subprocess
+import sys
+import unittest
+
+from test.support import SHORT_TIMEOUT, requires_subprocess
+from test.support.os_helper import TESTFN, unlink
+
+
+@requires_subprocess()
+class CommandLineInterface(unittest.TestCase):
+
+ def _do_test(self, *args, expect_success=True):
+ with subprocess.Popen(
+ [sys.executable, "-Xutf8", "-m", "sqlite3", *args],
+ encoding="utf-8",
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ ) as proc:
+ proc.wait()
+ if expect_success == bool(proc.returncode):
+ self.fail("".join(proc.stderr))
+ stdout = proc.stdout.read()
+ stderr = proc.stderr.read()
+ if expect_success:
+ self.assertEqual(stderr, "")
+ else:
+ self.assertEqual(stdout, "")
+ return stdout, stderr
+
+ def expect_success(self, *args):
+ out, _ = self._do_test(*args)
+ return out
+
+ def expect_failure(self, *args):
+ _, err = self._do_test(*args, expect_success=False)
+ return err
+
+ def test_cli_help(self):
+ out = self.expect_success("-h")
+ self.assertIn("usage: python -m sqlite3", out)
+
+ def test_cli_version(self):
+ out = self.expect_success("-v")
+ self.assertIn(sqlite.sqlite_version, out)
+
+ def test_cli_execute_sql(self):
+ out = self.expect_success(":memory:", "select 1")
+ self.assertIn("(1,)", out)
+
+ def test_cli_execute_too_much_sql(self):
+ stderr = self.expect_failure(":memory:", "select 1; select 2")
+ err = "ProgrammingError: You can only execute one statement at a time"
+ self.assertIn(err, stderr)
+
+ def test_cli_execute_incomplete_sql(self):
+ stderr = self.expect_failure(":memory:", "sel")
+ self.assertIn("OperationalError (SQLITE_ERROR)", stderr)
+
+ def test_cli_on_disk_db(self):
+ self.addCleanup(unlink, TESTFN)
+ out = self.expect_success(TESTFN, "create table t(t)")
+ self.assertEqual(out, "")
+ out = self.expect_success(TESTFN, "select count(t) from t")
+ self.assertIn("(0,)", out)
+
+
+@requires_subprocess()
+class InteractiveSession(unittest.TestCase):
+ TIMEOUT = SHORT_TIMEOUT / 10.
+ MEMORY_DB_MSG = "Connected to a transient in-memory database"
+ PS1 = "sqlite> "
+ PS2 = "... "
+
+ def start_cli(self, *args):
+ return subprocess.Popen(
+ [sys.executable, "-Xutf8", "-m", "sqlite3", *args],
+ encoding="utf-8",
+ bufsize=0,
+ stdin=subprocess.PIPE,
+ # Note: the banner is printed to stderr, the prompt to stdout.
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+
+ def expect_success(self, proc):
+ proc.wait()
+ if proc.returncode:
+ self.fail("".join(proc.stderr))
+
+ def test_interact(self):
+ with self.start_cli() as proc:
+ out, err = proc.communicate(timeout=self.TIMEOUT)
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertIn(self.PS1, out)
+ self.expect_success(proc)
+
+ def test_interact_quit(self):
+ with self.start_cli() as proc:
+ out, err = proc.communicate(input=".quit", timeout=self.TIMEOUT)
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertIn(self.PS1, out)
+ self.expect_success(proc)
+
+ def test_interact_version(self):
+ with self.start_cli() as proc:
+ out, err = proc.communicate(input=".version", timeout=self.TIMEOUT)
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertIn(sqlite.sqlite_version, out)
+ self.expect_success(proc)
+
+ def test_interact_valid_sql(self):
+ with self.start_cli() as proc:
+ out, err = proc.communicate(input="select 1;",
+ timeout=self.TIMEOUT)
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertIn("(1,)", out)
+ self.expect_success(proc)
+
+ def test_interact_valid_multiline_sql(self):
+ with self.start_cli() as proc:
+ out, err = proc.communicate(input="select 1\n;",
+ timeout=self.TIMEOUT)
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertIn(self.PS2, out)
+ self.assertIn("(1,)", out)
+ self.expect_success(proc)
+
+ def test_interact_invalid_sql(self):
+ with self.start_cli() as proc:
+ out, err = proc.communicate(input="sel;", timeout=self.TIMEOUT)
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertIn("OperationalError (SQLITE_ERROR)", err)
+ self.expect_success(proc)
+
+ def test_interact_on_disk_file(self):
+ self.addCleanup(unlink, TESTFN)
+ with self.start_cli(TESTFN) as proc:
+ out, err = proc.communicate(input="create table t(t);",
+ timeout=self.TIMEOUT)
+ self.assertIn(TESTFN, err)
+ self.assertIn(self.PS1, out)
+ self.expect_success(proc)
+ with self.start_cli(TESTFN, "select count(t) from t") as proc:
+ out = proc.stdout.read()
+ err = proc.stderr.read()
+ self.assertIn("(0,)", out)
+ self.expect_success(proc)
+
+
+if __name__ == "__main__":
+ unittest.main()