from argparse import ArgumentParser
from code import InteractiveConsole
from textwrap import dedent
+from _colorize import get_theme, theme_no_color
-def execute(c, sql, suppress_errors=True):
+def execute(c, sql, suppress_errors=True, theme=theme_no_color):
"""Helper that wraps execution of SQL code.
This is used both by the REPL and by direct execution from the CLI.
for row in c.execute(sql):
print(row)
except sqlite3.Error as e:
+ t = theme.traceback
tp = type(e).__name__
try:
- print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
+ tp += f" ({e.sqlite_errorname})"
except AttributeError:
- print(f"{tp}: {e}", file=sys.stderr)
+ pass
+ print(
+ f"{t.type}{tp}{t.reset}: {t.message}{e}{t.reset}", file=sys.stderr
+ )
if not suppress_errors:
sys.exit(1)
class SqliteInteractiveConsole(InteractiveConsole):
"""A simple SQLite REPL."""
- def __init__(self, connection):
+ def __init__(self, connection, use_color=False):
super().__init__()
self._con = connection
self._cur = connection.cursor()
+ self._use_color = use_color
def runsource(self, source, filename="<input>", symbol="single"):
"""Override runsource, the core of the InteractiveConsole REPL.
Return True if more input is needed; buffering is done automatically.
Return False if input is a complete statement ready for execution.
"""
+ theme = get_theme(force_no_color=not self._use_color)
+
if not source or source.isspace():
return False
if source[0] == ".":
case "":
pass
case _ as unknown:
- self.write("Error: unknown command or invalid arguments:"
- f' "{unknown}".\n')
+ t = theme.traceback
+ self.write(f'{t.type}Error{t.reset}:{t.message} unknown'
+ f'command or invalid arguments: "{unknown}".\n{t.reset}')
else:
if not sqlite3.complete_statement(source):
return True
- execute(self._cur, source)
+ execute(self._cur, source, theme=theme)
return False
Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or {eofkey} to quit.
""").strip()
- sys.ps1 = "sqlite> "
- sys.ps2 = " ... "
+
+ theme = get_theme()
+ s = theme.syntax
+
+ sys.ps1 = f"{s.prompt}sqlite> {s.reset}"
+ sys.ps2 = f"{s.prompt} ... {s.reset}"
con = sqlite3.connect(args.filename, isolation_level=None)
try:
if args.sql:
# SQL statement provided on the command-line; execute it directly.
- execute(con, args.sql, suppress_errors=False)
+ execute(con, args.sql, suppress_errors=False, theme=theme)
else:
# No SQL provided; start the REPL.
- console = SqliteInteractiveConsole(con)
+ console = SqliteInteractiveConsole(con, use_color=True)
try:
import readline # noqa: F401
except ImportError:
captured_stdout,
captured_stderr,
captured_stdin,
- force_not_colorized,
+ force_not_colorized_test_class,
)
+@force_not_colorized_test_class
class CommandLineInterface(unittest.TestCase):
def _do_test(self, *args, expect_success=True):
self.assertEqual(out, "")
return err
- @force_not_colorized
def test_cli_help(self):
out = self.expect_success("-h")
self.assertIn("usage: ", out)
self.assertIn("(0,)", out)
+@force_not_colorized_test_class
class InteractiveSession(unittest.TestCase):
MEMORY_DB_MSG = "Connected to a transient in-memory database"
PS1 = "sqlite> "
out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",))
self.assertIn("(0,)\n", out)
+ def test_color(self):
+ with unittest.mock.patch("_colorize.can_colorize", return_value=True):
+ out, err = self.run_cli(commands="TEXT\n")
+ self.assertIn("\x1b[1;35msqlite> \x1b[0m", out)
+ self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out)
+ out, err = self.run_cli(commands=("sel;",))
+ self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: '
+ '\x1b[35mnear "sel": syntax error\x1b[0m', err)
if __name__ == "__main__":
unittest.main()