from sqlalchemy import testing
from sqlalchemy.testing import fixtures
+
class BindTest(fixtures.TestBase):
def test_bind_close_engine(self):
e = testing.db
]:
assert_raises_message(
exc.UnboundExecutionError,
- "Table object 'test_table' is not bound to an Engine or Connection.",
+ ("Table object 'test_table' is not bound to an Engine or "
+ "Connection."),
meth
)
finally:
metadata.drop_all(bind=conn)
-
def test_clauseelement(self):
metadata = MetaData()
table = Table('test_table', metadata,
if isinstance(bind, engine.Connection):
bind.close()
metadata.drop_all(bind=testing.db)
-
-
metadata, users, engine = self.metadata, self.users, self.engine
canary = []
users.append_ddl_listener('before-create',
- lambda e, t, b:canary.append('mxyzptlk')
+ lambda e, t, b: canary.append('mxyzptlk')
)
users.append_ddl_listener('after-create',
- lambda e, t, b:canary.append('klptzyxm')
+ lambda e, t, b: canary.append('klptzyxm')
)
users.append_ddl_listener('before-drop',
- lambda e, t, b:canary.append('xyzzy')
+ lambda e, t, b: canary.append('xyzzy')
)
users.append_ddl_listener('after-drop',
- lambda e, t, b:canary.append('fnord')
+ lambda e, t, b: canary.append('fnord')
)
metadata.create_all()
metadata, users, engine = self.metadata, self.users, self.engine
canary = []
metadata.append_ddl_listener('before-create',
- lambda e, t, b, tables=None:canary.append('mxyzptlk')
+ lambda e, t, b, tables=None: canary.append('mxyzptlk')
)
metadata.append_ddl_listener('after-create',
- lambda e, t, b, tables=None:canary.append('klptzyxm')
+ lambda e, t, b, tables=None: canary.append('klptzyxm')
)
metadata.append_ddl_listener('before-drop',
- lambda e, t, b, tables=None:canary.append('xyzzy')
+ lambda e, t, b, tables=None: canary.append('xyzzy')
)
metadata.append_ddl_listener('after-drop',
- lambda e, t, b, tables=None:canary.append('fnord')
+ lambda e, t, b, tables=None: canary.append('fnord')
)
metadata.create_all()
metadata, users, engine = self.metadata, self.users, self.engine
nonpg_mock = engines.mock_engine(dialect_name='sqlite')
pg_mock = engines.mock_engine(dialect_name='postgresql')
- constraint = CheckConstraint('a < b', name='my_test_constraint'
- , table=users)
+ constraint = CheckConstraint('a < b', name='my_test_constraint',
+ table=users)
# by placing the constraint in an Add/Drop construct, the
# 'inline_ddl' flag is set to False
metadata, users, engine = self.metadata, self.users, self.engine
nonpg_mock = engines.mock_engine(dialect_name='sqlite')
pg_mock = engines.mock_engine(dialect_name='postgresql')
- constraint = CheckConstraint('a < b', name='my_test_constraint'
- , table=users)
+ constraint = CheckConstraint('a < b', name='my_test_constraint',
+ table=users)
# by placing the constraint in an Add/Drop construct, the
# 'inline_ddl' flag is set to False
)
-
-
class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
def mock_engine(self):
executor = lambda *a, **kw: None
dialect=dialect)
self.assert_compile(ddl.against(sane_schema), 'S S-T T-s.t-b',
dialect=dialect)
- self.assert_compile(ddl.against(insane_alone), 'S S-T T-"t t"-b'
- , dialect=dialect)
+ self.assert_compile(ddl.against(insane_alone), 'S S-T T-"t t"-b',
+ dialect=dialect)
self.assert_compile(ddl.against(insane_schema),
'S S-T T-"s s"."t t"-b', dialect=dialect)
-
def test_filter(self):
cx = self.mock_engine()
assert DDL('').execute_if(dialect=target)._should_execute(tbl, cx)
assert not DDL('').execute_if(dialect='bogus').\
_should_execute(tbl, cx)
- assert DDL('').execute_if(callable_=lambda d, y,z, **kw: True).\
+ assert DDL('').execute_if(callable_=lambda d, y, z, **kw: True).\
_should_execute(tbl, cx)
assert(DDL('').execute_if(
- callable_=lambda d, y,z, **kw: z.engine.name
+ callable_=lambda d, y, z, **kw: z.engine.name
!= 'bogus').
_should_execute(tbl, cx))
assert DDL('', on=target)._should_execute_deprecated('x', tbl, cx)
assert not DDL('', on='bogus').\
_should_execute_deprecated('x', tbl, cx)
- assert DDL('', on=lambda d, x,y,z: True).\
+ assert DDL('', on=lambda d, x, y, z: True).\
_should_execute_deprecated('x', tbl, cx)
- assert(DDL('', on=lambda d, x,y,z: z.engine.name != 'bogus').
+ assert(DDL('', on=lambda d, x, y, z: z.engine.name != 'bogus').
_should_execute_deprecated('x', tbl, cx))
def test_repr(self):
assert repr(DDL('s'))
assert repr(DDL('s', on='engine'))
assert repr(DDL('s', on=lambda x: 1))
- assert repr(DDL('s', context={'a':1}))
- assert repr(DDL('s', on='engine', context={'a':1}))
-
-
+ assert repr(DDL('s', context={'a': 1}))
+ assert repr(DDL('s', on='engine', context={'a': 1}))
return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
**kw)
+
class PoolTest(PoolTestBase):
def test_manager(self):
manager = pool.manage(MockDBAPI(), use_threadlocal=True)
]
)
-
def test_bad_args(self):
manager = pool.manage(MockDBAPI())
manager.connect(None)
class PoolDialectTest(PoolTestBase):
def _dialect(self):
canary = []
+
class PoolDialect(object):
def do_rollback(self, dbapi_connection):
canary.append('R')
def _first_connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def first_connect(*arg, **kw):
canary.append('first_connect')
def _connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def connect(*arg, **kw):
canary.append('connect')
+
event.listen(p, 'connect', connect)
return p, canary
def _checkout_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def checkout(*arg, **kw):
canary.append('checkout')
event.listen(p, 'checkout', checkout)
def _checkin_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def checkin(*arg, **kw):
canary.append('checkin')
event.listen(p, 'checkin', checkin)
def _reset_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def reset(*arg, **kw):
canary.append('reset')
event.listen(p, 'reset', reset)
def test_listen_targets_scope(self):
canary = []
+
def listen_one(*args):
canary.append("listen_one")
+
def listen_two(*args):
canary.append("listen_two")
+
def listen_three(*args):
canary.append("listen_three")
+
def listen_four(*args):
canary.append("listen_four")
)
def test_listen_targets_per_subclass(self):
- """test that listen() called on a subclass remains specific to that subclass."""
+ """test that listen() called on a subclass remains specific to
+ that subclass."""
canary = []
+
def listen_one(*args):
canary.append("listen_one")
+
def listen_two(*args):
canary.append("listen_two")
+
def listen_three(*args):
canary.append("listen_three")
# going
pool.Pool.dispatch._clear()
+
class PoolFirstConnectSyncTest(PoolTestBase):
# test [ticket:2964]
th.join(join_timeout)
eq_(evt.mock_calls,
- [call.first_connect(), call.connect(), call.connect(), call.connect()]
+ [
+ call.first_connect(),
+ call.connect(),
+ call.connect(),
+ call.connect()]
)
-
class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")
if hasattr(self, 'checkin'):
self.checkin = self.inst_checkin
self.clear()
+
def clear(self):
self.connected = []
self.first_connected = []
self.checked_out = []
self.checked_in = []
+
def assert_total(innerself, conn, fconn, cout, cin):
eq_(len(innerself.connected), conn)
eq_(len(innerself.first_connected), fconn)
eq_(len(innerself.checked_out), cout)
eq_(len(innerself.checked_in), cin)
+
def assert_in(innerself, item, in_conn, in_fconn,
in_cout, in_cin):
self.assert_((item in innerself.connected) == in_conn)
self.assert_((item in innerself.first_connected) == in_fconn)
self.assert_((item in innerself.checked_out) == in_cout)
self.assert_((item in innerself.checked_in) == in_cin)
+
def inst_connect(self, con, record):
print("connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.connected.append(con)
+
def inst_first_connect(self, con, record):
print("first_connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.first_connected.append(con)
+
def inst_checkout(self, con, record, proxy):
print("checkout(%s, %s, %s)" % (con, record, proxy))
assert con is not None
assert record is not None
assert proxy is not None
self.checked_out.append(con)
+
def inst_checkin(self, con, record):
print("checkin(%s, %s)" % (con, record))
# con can be None if invalidated
class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener):
pass
+
class ListenConnect(InstrumentingListener):
def connect(self, con, record):
pass
+
class ListenFirstConnect(InstrumentingListener):
def first_connect(self, con, record):
pass
+
class ListenCheckOut(InstrumentingListener):
def checkout(self, con, record, proxy, num):
pass
+
class ListenCheckIn(InstrumentingListener):
def checkin(self, con, record):
pass
def test_listeners_callables(self):
def connect(dbapi_con, con_record):
counts[0] += 1
+
def checkout(dbapi_con, con_record, con_proxy):
counts[1] += 1
+
def checkin(dbapi_con, con_record):
counts[2] += 1
pool_size=2,
max_overflow=1, use_threadlocal=False, timeout=3)
timeouts = []
+
def checkout():
for x in range(1):
now = time.time()
dbapi = MockDBAPI()
mutex = threading.Lock()
+
def creator():
time.sleep(.05)
with mutex:
pool_size=3, timeout=2,
max_overflow=max_overflow)
peaks = []
+
def whammy():
for i in range(10):
try:
lazy_gc()
assert not pool._refs
-
def test_overflow_reset_on_failed_connect(self):
dbapi = Mock()
raise Exception("connection failed")
creator = dbapi.connect
+
def create():
return creator()
call("overflow_one")]
)
-
@testing.requires.threading_with_mock
def test_waiters_handled(self):
"""test that threads waiting for connections are
"""
mutex = threading.Lock()
dbapi = MockDBAPI()
+
def creator():
mutex.acquire()
try:
p = pool.QueuePool(creator=creator,
pool_size=2, timeout=timeout,
max_overflow=max_overflow)
+
def waiter(p, timeout, max_overflow):
success_key = (timeout, max_overflow)
conn = p.connect()
dbapi = MockDBAPI()
canary = []
+
def creator():
canary.append(1)
return dbapi.connect()
p1 = pool.QueuePool(creator=creator,
pool_size=1, timeout=None,
max_overflow=0)
+
def waiter(p):
conn = p.connect()
canary.append(2)
def test_mixed_close(self):
pool._refs.clear()
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
+ use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
# disable weakref collection of the
# underlying connections
strong_refs = set()
+
def _conn():
c = p.connect()
strong_refs.add(c.connection)
dialect.dbapi.Error = Error
pools = []
+
class TrackQueuePool(pool.QueuePool):
def __init__(self, *arg, **kw):
pools.append(self)
def attempt(conn):
time.sleep(random.random())
try:
- conn._handle_dbapi_exception(Error(), "statement", {}, Mock(), Mock())
+ conn._handle_dbapi_exception(Error(), "statement", {},
+ Mock(), Mock())
except tsa.exc.DBAPIError:
pass
- # run an error + invalidate operation on the remaining 7 open connections
+ # run an error + invalidate operation on the remaining 7 open
+ #connections
threads = []
for conn in conns:
t = threading.Thread(target=attempt, args=(conn, ))
assert c1.connection.id != c_id
def test_recreate(self):
- p = self._queuepool_fixture(reset_on_return=None, pool_size=1, max_overflow=0)
+ p = self._queuepool_fixture(reset_on_return=None, pool_size=1,
+ max_overflow=0)
p2 = p.recreate()
assert p2.size() == 1
assert p2._reset_on_return is pool.reset_none
eq_(c2_con.close.call_count, 0)
def test_threadfairy(self):
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
+ use_threadlocal=True)
c1 = p.connect()
c1.close()
c2 = p.connect()
assert c2.connection is not None
+
class ResetOnReturnTest(PoolTestBase):
def _fixture(self, **kw):
dbapi = Mock()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+ return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
+ **kw)
def test_plain_rollback(self):
dbapi, p = self._fixture(reset_on_return='rollback')
assert not dbapi.connect().rollback.called
assert dbapi.connect().commit.called
+
class SingletonThreadPoolTest(PoolTestBase):
@testing.requires.threading_with_mock
dbapi = MockDBAPI()
lock = threading.Lock()
+
def creator():
# the mock iterator isn't threadsafe...
with lock:
if strong_refs:
sr = set()
+
def _conn():
c = p.connect()
sr.add(c.connection)
still_opened = len([c for c in sr if not c.close.call_count])
eq_(still_opened, 3)
+
class AssertionPoolTest(PoolTestBase):
def test_connect_error(self):
dbapi = MockDBAPI()
c3 = p.connect()
assert_raises(AssertionError, p.connect)
+
class NullPoolTest(PoolTestBase):
def test_reconnect(self):
dbapi = MockDBAPI()
)
)
+
class CDateProcessorTest(_DateProcessorTest):
__requires__ = ('cextensions',)
+
@classmethod
def setup_class(cls):
from sqlalchemy import cprocessors
def test_distill_single_list_tuples(self):
eq_(
- self.module._distill_params(([("foo", "bar"), ("bat", "hoho")],), {}),
+ self.module._distill_params(
+ ([("foo", "bar"), ("bat", "hoho")],), {}),
[('foo', 'bar'), ('bat', 'hoho')]
)
def test_distill_multi_list_tuple(self):
eq_(
self.module._distill_params(
- ([("foo", "bar")], [("bar", "bat")]),
- {}
- ),
+ ([("foo", "bar")], [("bar", "bat")]), {}),
([('foo', 'bar')], [('bar', 'bat')])
)
def test_distill_single_list_dicts(self):
eq_(
- self.module._distill_params(([{"foo": "bar"}, {"foo": "hoho"}],), {}),
+ self.module._distill_params(
+ ([{"foo": "bar"}, {"foo": "hoho"}],), {}),
[{'foo': 'bar'}, {'foo': 'hoho'}]
)
)
-
class PyDistillArgsTest(_DistillArgsTest):
@classmethod
def setup_class(cls):
)
)
+
class CDistillArgsTest(_DistillArgsTest):
__requires__ = ('cextensions', )
+
@classmethod
def setup_class(cls):
from sqlalchemy import cutils as util
metadata, users = None, None
+
class ReflectionTest(fixtures.TestBase, ComparesTables):
__backend__ = True
"""
Table('a', self.metadata, Column('id', Integer, primary_key=True))
Table('b', self.metadata, Column('id', Integer, primary_key=True),
- Column('a_id', Integer, sa.ForeignKey('a.id')))
+ Column('a_id', Integer, sa.ForeignKey('a.id')))
self.metadata.create_all()
m2 = MetaData()
"""
Table('a', self.metadata, Column('id', Integer, primary_key=True))
Table('b', self.metadata, Column('id', Integer, primary_key=True),
- Column('a_id', Integer, sa.ForeignKey('a.id')))
+ Column('a_id', Integer, sa.ForeignKey('a.id')))
self.metadata.create_all()
m2 = MetaData()
eq_(list(table.primary_key), [table.c.col1])
eq_(table.c.col1.primary_key, True)
-
@testing.provide_metadata
def test_override_pkfk(self):
"""test that you can override columns which contain foreign keys
Column('id', sa.Integer, primary_key=True),
Column('street', sa.String(30)))
-
meta.create_all()
meta2 = MetaData(testing.db)
a2 = Table('addresses', meta2,
assert f1 in b1.constraints
assert len(b1.constraints) == 2
-
-
@testing.provide_metadata
def test_override_keys(self):
"""test that columns can be overridden with a 'key',
backends with {dialect}.get_foreign_keys() support)"""
if testing.against('postgresql'):
- test_attrs = ('match', 'onupdate', 'ondelete', 'deferrable', 'initially')
+ test_attrs = ('match', 'onupdate', 'ondelete',
+ 'deferrable', 'initially')
addresses_user_id_fkey = sa.ForeignKey(
# Each option is specifically not a Postgres default, or
# it won't be returned by PG's inspection
'users.id',
- name = 'addresses_user_id_fkey',
+ name='addresses_user_id_fkey',
match='FULL',
onupdate='RESTRICT',
ondelete='RESTRICT',
# elided by MySQL's inspection
addresses_user_id_fkey = sa.ForeignKey(
'users.id',
- name = 'addresses_user_id_fkey',
+ name='addresses_user_id_fkey',
onupdate='CASCADE',
ondelete='CASCADE'
)
Column('slot', sa.String(128)),
)
- assert_raises_message(sa.exc.InvalidRequestError,
- "Foreign key associated with column 'slots.pkg_id' "
- "could not find table 'pkgs' with which to generate "
- "a foreign key to target column 'pkg_id'",
- metadata.create_all)
+ assert_raises_message(
+ sa.exc.InvalidRequestError,
+ "Foreign key associated with column 'slots.pkg_id' "
+ "could not find table 'pkgs' with which to generate "
+ "a foreign key to target column 'pkg_id'",
+ metadata.create_all)
def test_composite_pks(self):
"""test reflection of a composite primary key"""
table.c.multi_hoho
== table2.c.lala).compare(j.onclause))
-
@testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on')
@testing.provide_metadata
def test_reserved(self):
def test_reflect_uses_bind_engine_reflect(self):
self._test_reflect_uses_bind(lambda e: MetaData().reflect(e))
-
@testing.provide_metadata
def test_reflect_all(self):
existing = testing.db.table_names()
finally:
_drop_views(metadata.bind)
+
class CreateDropTest(fixtures.TestBase):
__backend__ = True
eq_(ua, ['users', 'email_addresses'])
eq_(oi, ['orders', 'items'])
-
def test_checkfirst(self):
try:
assert not users.exists(testing.db)
- set(testing.db.table_names()))
metadata.drop_all(bind=testing.db)
+
class SchemaManipulationTest(fixtures.TestBase):
__backend__ = True
assert len(addresses.c.user_id.foreign_keys) == 1
assert addresses.constraints == set([addresses.primary_key, fk])
+
class UnicodeReflectionTest(fixtures.TestBase):
__backend__ = True
('plain', 'col_plain', 'ix_plain')
])
no_has_table = [
- ('no_has_table_1', ue('col_Unit\u00e9ble'), ue('ix_Unit\u00e9ble')),
- ('no_has_table_2', ue('col_\u6e2c\u8a66'), ue('ix_\u6e2c\u8a66')),
+ (
+ 'no_has_table_1',
+ ue('col_Unit\u00e9ble'),
+ ue('ix_Unit\u00e9ble')
+ ),
+ (
+ 'no_has_table_2',
+ ue('col_\u6e2c\u8a66'),
+ ue('ix_\u6e2c\u8a66')
+ ),
]
no_case_sensitivity = [
- (ue('\u6e2c\u8a66'), ue('col_\u6e2c\u8a66'), ue('ix_\u6e2c\u8a66')),
- (ue('unit\u00e9ble'), ue('col_unit\u00e9ble'), ue('ix_unit\u00e9ble')),
+ (
+ ue('\u6e2c\u8a66'),
+ ue('col_\u6e2c\u8a66'),
+ ue('ix_\u6e2c\u8a66')
+ ),
+ (
+ ue('unit\u00e9ble'),
+ ue('col_unit\u00e9ble'),
+ ue('ix_unit\u00e9ble')
+ ),
]
full = [
- (ue('Unit\u00e9ble'), ue('col_Unit\u00e9ble'), ue('ix_Unit\u00e9ble')),
- (ue('\u6e2c\u8a66'), ue('col_\u6e2c\u8a66'), ue('ix_\u6e2c\u8a66')),
+ (
+ ue('Unit\u00e9ble'),
+ ue('col_Unit\u00e9ble'),
+ ue('ix_Unit\u00e9ble')
+ ),
+ (
+ ue('\u6e2c\u8a66'),
+ ue('col_\u6e2c\u8a66'),
+ ue('ix_\u6e2c\u8a66')
+ ),
]
# as you can see, our options for this kind of thing
[(names[tname][1], names[tname][0])]
)
+
class SchemaTest(fixtures.TestBase):
__backend__ = True
)
-
-
# Tests related to engine.reflection
dingalings = Table("dingalings", meta,
Column('dingaling_id', sa.Integer, primary_key=True),
Column('address_id', sa.Integer,
- sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
+ sa.ForeignKey(
+ '%semail_addresses.address_id' % schema_prefix)),
Column('data', sa.String(30)),
schema=schema, test_needs_fk=True,
)
return (users, addresses, dingalings)
+
def createIndexes(con, schema=None):
fullname = 'users'
if schema:
query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname
con.execute(sa.sql.text(query))
+
@testing.requires.views
def _create_views(con, schema=None):
for table_name in ('users', 'email_addresses'):
if schema:
fullname = "%s.%s" % (schema, table_name)
view_name = fullname + '_v'
- query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
- fullname)
+ query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name, fullname)
con.execute(sa.sql.text(query))
+
@testing.requires.views
def _drop_views(con, schema=None):
for table_name in ('email_addresses', 'users'):
'weird_casing."Col2", weird_casing."col3" '
'FROM weird_casing')
+
class CaseSensitiveTest(fixtures.TablesTest):
"""Nail down case sensitive behaviors, mostly on MySQL."""
__backend__ = True
)
def test_reflect_via_fk(self):
m = MetaData()
- t2 = Table("SomeOtherTable", m, autoload=True, autoload_with=testing.db)
+ t2 = Table("SomeOtherTable", m, autoload=True,
+ autoload_with=testing.db)
eq_(t2.name, "SomeOtherTable")
assert "SomeTable" in m.tables
eq_(t2.name, "sOmEtAbLe")
-
class ColumnEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
__backend__ = True
from sqlalchemy.schema import Table
m = MetaData(testing.db)
+
def column_reflect(insp, table, column_info):
if column_info['name'] == col:
column_info.update(update)
def test_override_key_fk(self):
m = MetaData(testing.db)
+
def column_reflect(insp, table, column_info):
if column_info['name'] == 'q':
users, metadata = None, None
+
+
class TransactionTest(fixtures.TestBase):
__backend__ = True
global users, metadata
metadata = MetaData()
users = Table('query_users', metadata,
- Column('user_id', INT, primary_key = True),
+ Column('user_id', INT, primary_key=True),
Column('user_name', VARCHAR(20)),
test_needs_acid=True,
)
order_by(users.c.user_id))
eq_(result.fetchall(), [])
+
class ResetAgentTest(fixtures.TestBase):
__backend__ = True
trans.rollback()
assert connection.connection._reset_agent is None
+
class AutoRollbackTest(fixtures.TestBase):
__backend__ = True
users.drop(conn2)
conn2.close()
+
class ExplicitAutoCommitTest(fixtures.TestBase):
"""test the 'autocommit' flag on select() and text() objects.
eq_(conn.get_isolation_level(),
self._non_default_isolation_level())
eq_(c2.get_isolation_level(), self._non_default_isolation_level())
-