}
-@generate_driver_url.for_db("sqlite")
-def generate_driver_url(url, driver, query_str):
- if driver == "pysqlcipher" and url.get_driver_name() != "pysqlcipher":
- if url.database:
- url = url.set(database=url.database + ".enc")
+def _format_url(url, driver, ident):
+ """given a sqlite url + desired driver + ident, make a canonical
+ URL out of it
+
+ """
+ url = sa_url.make_url(url)
+
+ if driver is None:
+ driver = url.get_driver_name()
+
+ filename = url.database
+
+ needs_enc = driver == "pysqlcipher"
+ name_token = None
+
+ if filename and filename != ":memory:":
+ assert "test_schema" not in filename
+ tokens = re.split(r"[_\.]", filename)
+
+ new_filename = f"{driver}"
+
+ for token in tokens:
+ if token in _drivernames:
+ if driver is None:
+ driver = token
+ continue
+ elif token in ("db", "enc"):
+ continue
+ elif name_token is None:
+ name_token = token.strip("_")
+
+ assert name_token, f"sqlite filename has no name token: {url.database}"
+
+ new_filename = f"{name_token}_{driver}"
+ if ident:
+ new_filename += f"_{ident}"
+ new_filename += ".db"
+ if needs_enc:
+ new_filename += ".enc"
+ url = url.set(database=new_filename)
+
+ if needs_enc:
url = url.set(password="test")
+
url = url.set(drivername="sqlite+%s" % (driver,))
+
+ return url
+
+
+@generate_driver_url.for_db("sqlite")
+def generate_driver_url(url, driver, query_str):
+ url = _format_url(url, driver, None)
+
try:
url.get_dialect()
except exc.NoSuchModuleError:
@follower_url_from_main.for_db("sqlite")
def _sqlite_follower_url_from_main(url, ident):
- url = sa_url.make_url(url)
-
- if not url.database or url.database == ":memory:":
- return url
- else:
-
- m = re.match(r"(.+?)\.(.+)$", url.database)
- name, ext = m.group(1, 2)
- drivername = url.get_driver_name()
- return sa_url.make_url(
- "sqlite+%s:///%s_%s.%s" % (drivername, drivername, ident, ext)
- )
+ return _format_url(url, None, ident)
@post_configure_engine.for_db("sqlite")
def _sqlite_post_configure_engine(url, engine, follower_ident):
from sqlalchemy import event
+ if follower_ident:
+ attach_path = f"{follower_ident}_{engine.driver}_test_schema.db"
+ else:
+ attach_path = f"{engine.driver}_test_schema.db"
+
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
# use file DBs in all cases, memory acts kind of strangely
# as an attached
- if not follower_ident:
- # note this test_schema.db gets created for all test runs.
- # there's not any dedicated cleanup step for it. it in some
- # ways corresponds to the "test.test_schema" schema that's
- # expected to be already present, so for now it just stays
- # in a given checkout directory.
- dbapi_connection.execute(
- 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
- % (engine.driver,)
- )
- else:
- dbapi_connection.execute(
- 'ATTACH DATABASE "%s_%s_test_schema.db" AS test_schema'
- % (follower_ident, engine.driver)
- )
+
+ # NOTE! this has to be done *per connection*. New sqlite connection,
+ # as we get with say, QueuePool, the attaches are gone.
+ # so schemes to delete those attached files have to be done at the
+ # filesystem level and not rely upon what attachments are in a
+ # particular SQLite connection
+ dbapi_connection.execute(
+ f'ATTACH DATABASE "{attach_path}" AS test_schema'
+ )
+
+ @event.listens_for(engine, "engine_disposed")
+ def dispose(engine):
+ """most databases should be dropped using
+ stop_test_class_outside_fixtures
+
+ however a few tests like AttachedDBTest might not get triggered on
+ that main hook
+
+ """
+
+ if os.path.exists(attach_path):
+ os.remove(attach_path)
+
+ filename = engine.url.database
+
+ if filename and filename != ":memory:" and os.path.exists(filename):
+ os.remove(filename)
@create_db.for_db("sqlite")
@drop_db.for_db("sqlite")
def _sqlite_drop_db(cfg, eng, ident):
- for path in [
- "%s.db" % ident,
- "%s_%s_test_schema.db" % (ident, eng.driver),
- ]:
- if os.path.exists(path):
- log.info("deleting SQLite database file: %s" % path)
+ _drop_dbs_w_ident(eng.url.database, eng.driver, ident)
+
+
+def _drop_dbs_w_ident(databasename, driver, ident):
+ for path in os.listdir("."):
+ fname, ext = os.path.split(path)
+ if ident in fname and ext in [".db", ".db.enc"]:
+ log.info("deleting SQLite database file: %s", path)
os.remove(path)
@stop_test_class_outside_fixtures.for_db("sqlite")
def stop_test_class_outside_fixtures(config, db, cls):
- with db.connect() as conn:
- files = [
- row.file
- for row in conn.exec_driver_sql("PRAGMA database_list")
- if row.file
- ]
-
- if files:
- db.dispose()
- # some sqlite file tests are not cleaning up well yet, so do this
- # just to make things simple for now
- for file_ in files:
- if file_ and os.path.exists(file_):
- os.remove(file_)
+ db.dispose()
@temp_table_keyword_args.for_db("sqlite")
@run_reap_dbs.for_db("sqlite")
def _reap_sqlite_dbs(url, idents):
log.info("db reaper connecting to %r", url)
-
log.info("identifiers in file: %s", ", ".join(idents))
+ url = sa_url.make_url(url)
for ident in idents:
- # we don't have a config so we can't call _sqlite_drop_db due to the
- # decorator
- for ext in ("db", "db.enc"):
- for path in (
- ["%s.%s" % (ident, ext)]
- + [
- "%s_%s.%s" % (drivername, ident, ext)
- for drivername in _drivernames
- ]
- + [
- "%s_test_schema.%s" % (drivername, ext)
- for drivername in _drivernames
- ]
- + [
- "%s_%s_test_schema.%s" % (ident, drivername, ext)
- for drivername in _drivernames
- ]
- ):
- if os.path.exists(path):
- log.info("deleting SQLite database file: %s" % path)
- os.remove(path)
+ for drivername in _drivernames:
+ _drop_dbs_w_ident(url.database, drivername, ident)
@upsert.for_db("sqlite")