self.filename = name
self.element = element
+
# PART IV - Persistence Mapping
# Node class. a non-public class which will represent the DB-persisted
def find_document(path, compareto):
- j = documents
- prev_elements = None
query = session.query(Document)
attribute = "_root"
for i, match in enumerate(
def find_document(path, compareto):
- j = documents
- prev_elements = None
query = session.query(Document)
first = True
for i, match in enumerate(
if len(sys.argv) > 1:
potential_name = sys.argv[1]
try:
- suite = __import__(__name__ + "." + potential_name)
+ __import__(__name__ + "." + potential_name)
except ImportError:
pass
args.profile = args.profile or args.dump or args.runsnake
if cls.name is None:
- suite = __import__(__name__ + "." + args.name)
+ __import__(__name__ + "." + args.name)
Profiler(args).run()
"""Load fully tracked ORM objects into one big list()."""
sess = Session(engine)
- objects = list(sess.query(Customer).limit(n))
+ list(sess.query(Customer).limit(n))
@Profiler.profile
with engine.connect() as conn:
result = conn.execute(Customer.__table__.select().limit(n)).fetchall()
for row in result:
- data = row["id"], row["name"], row["description"]
+ row["id"], row["name"], row["description"]
@Profiler.profile
if not chunk:
break
for row in chunk:
- data = row["id"], row["name"], row["description"]
+ row["id"], row["name"], row["description"]
@Profiler.profile
if not chunk:
break
for row in chunk:
- data = row["id"], row["name"], row["description"]
+ row["id"], row["name"], row["description"]
@Profiler.profile
if make_objects:
for row in cursor.fetchall():
# ensure that we fully fetch!
- customer = SimpleCustomer(
- id_=row[0], name=row[1], description=row[2]
- )
+ SimpleCustomer(id_=row[0], name=row[1], description=row[2])
else:
for row in cursor.fetchall():
# ensure that we fully fetch!
- data = row[0], row[1], row[2]
+ row[0], row[1], row[2]
conn.close()
conn = engine.pool._creator()
cursor = conn.cursor()
cursor.execute(sql, arg)
- lastrowid = cursor.lastrowid
+ cursor.lastrowid
conn.commit()
conn.close()
else:
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(sql, arg)
- lastrowid = cursor.lastrowid
+ cursor.lastrowid
conn.commit()
conn.close()
# group rows by constraint ID, to handle multi-column FKs
fkeys = []
- fknm, scols, rcols = (None, [], [])
def fkey_rec():
return {
# without closing it (FreeTDS particularly)
row = self.cursor.fetchall()[0]
break
- except self.dialect.dbapi.Error as e:
+ except self.dialect.dbapi.Error:
# no way around this - nextset() consumes the previous set
# so we need to just keep flipping
self.cursor.nextset()
def _merge_textual_cols_by_position(
self, context, cursor_description, result_columns
):
- dialect = context.dialect
num_ctx_cols = len(result_columns) if result_columns else None
if num_ctx_cols > len(cursor_description):
yield idx, colname, mapped_type, coltype, obj, untranslated
def _merge_cols_by_none(self, context, cursor_description):
- dialect = context.dialect
for (
idx,
colname,
@classmethod
def _listen(cls, event_key, raw=False, propagate=False, **kw):
- target, identifier, fn = (
- event_key.dispatch_target,
- event_key.identifier,
- event_key._listen_fn,
- )
+ target, fn = (event_key.dispatch_target, event_key._listen_fn)
if not raw:
def _listen(
cls, event_key, raw=False, propagate=False, retval=False, **kw
):
- target, identifier, fn = (
- event_key.dispatch_target,
- event_key.identifier,
- event_key.fn,
- )
+ target = event_key.dispatch_target
if target.class_ in target.all_holds:
collection = target.all_holds[target.class_]
)
def remove(self, event_key):
- target, identifier, fn = (
- event_key.dispatch_target,
- event_key.identifier,
- event_key.fn,
- )
+ target = event_key.dispatch_target
if isinstance(target, _EventsHold):
collection = target.all_holds[target.class_]
propagate=False,
):
- target, identifier, fn = (
- event_key.dispatch_target,
- event_key.identifier,
- event_key._listen_fn,
- )
+ target, fn = event_key.dispatch_target, event_key._listen_fn
if active_history:
target.dispatch._active_history = True
_registry = util.defaultdict(dict)
-_case_sensitive_registry = util.defaultdict(
- lambda: util.defaultdict(dict)
-)
+_case_sensitive_registry = util.defaultdict(lambda: util.defaultdict(dict))
_CASE_SENSITIVE = util.symbol(
name="case_sensitive_function",
doc="Symbol to mark the functions that are switched into case-sensitive "
- "mode.")
+ "mode.",
+)
def register_function(identifier, fn, package="_default"):
if raw_identifier in case_sensitive_reg[identifier]:
util.warn(
"The GenericFunction '{}' is already registered and "
- "is going to be overriden.".format(identifier))
+ "is going to be overriden.".format(identifier)
+ )
reg[identifier] = fn
else:
# If a function with the same lowercase identifier is registered,
"future release.".format(
raw_identifier,
list(case_sensitive_reg[identifier].keys())[0],
- ))
+ )
+ )
reg[identifier] = _CASE_SENSITIVE
# Check if a function with different letter case identifier is registered.
elif identifier in case_sensitive_reg:
# Note: This case will be removed in a later release.
- if (
- raw_identifier not in case_sensitive_reg[identifier]
- ):
+ if raw_identifier not in case_sensitive_reg[identifier]:
util.warn_deprecated(
"GenericFunction(s) '{}' are already registered with "
"different letter cases and might interact with '{}'. "
"GenericFunction objects will be fully case-insensitive in a "
"future release.".format(
sorted(case_sensitive_reg[identifier].keys()),
- raw_identifier))
+ raw_identifier,
+ )
+ )
else:
util.warn(
"The GenericFunction '{}' is already registered and "
- "is going to be overriden.".format(raw_identifier))
+ "is going to be overriden.".format(raw_identifier)
+ )
# Register by default
else:
def __init__(self, fn, left_index, right_index):
left = fn.clauses.clauses[left_index - 1]
right = fn.clauses.clauses[right_index - 1]
+
self.sql_function = fn
self.left_index = left_index
self.right_index = right_index
cls.type = clsdict["__return_type__"]
# Check _register attribute status
- cls._register = getattr(cls, '_register', True)
+ cls._register = getattr(cls, "_register", True)
# Register the function if required
if cls._register:
if sub_cls is not test_class.cls:
per_cls_dict = rebuilt_items[test_class.cls]
- names = [i.name for i in items]
for inst in pytest.Class(
sub_cls.__name__, parent=test_class.parent.parent
).collect():
"remote_table_2",
]
meta = self.metadata
- users, addresses, dingalings = (
- self.tables.users,
- self.tables.email_addresses,
- self.tables.dingalings,
- )
+
insp = inspect(meta.bind)
if table_type == "view":
def _test_get_columns(self, schema=None, table_type="table"):
meta = MetaData(testing.db)
- users, addresses, dingalings = (
- self.tables.users,
- self.tables.email_addresses,
- self.tables.dingalings,
- )
+ users, addresses = (self.tables.users, self.tables.email_addresses)
table_names = ["users", "email_addresses"]
if table_type == "view":
table_names = ["users_v", "email_addresses_v"]
@testing.provide_metadata
def _test_get_foreign_keys(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = (
- self.tables.users,
- self.tables.email_addresses,
- self.tables.dingalings,
- )
+ users, addresses = (self.tables.users, self.tables.email_addresses)
insp = inspect(meta.bind)
expected_schema = schema
# users
@testing.provide_metadata
def _test_get_indexes(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = (
- self.tables.users,
- self.tables.email_addresses,
- self.tables.dingalings,
- )
+
# The database may decide to create indexes for foreign keys, etc.
# so there may be more indexes than expected.
insp = inspect(meta.bind)
@testing.provide_metadata
def _test_get_view_definition(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = (
- self.tables.users,
- self.tables.email_addresses,
- self.tables.dingalings,
- )
view_name1 = "users_v"
view_name2 = "email_addresses_v"
insp = inspect(meta.bind)
@testing.provide_metadata
def _test_get_table_oid(self, table_name, schema=None):
meta = self.metadata
- users, addresses, dingalings = (
- self.tables.users,
- self.tables.email_addresses,
- self.tables.dingalings,
- )
insp = inspect(meta.bind)
oid = insp.get_table_oid(table_name, schema)
self.assert_(isinstance(oid, int))
def test_roundtrip(self):
md = self.metadata
- engine = self._fixture(True)
+ self._fixture(True)
test_table = Table(
"test_table",
md,
nkwargs = co.co_kwonlyargcount if py3k else 0
args = list(names[:nargs])
kwonlyargs = list(names[nargs : nargs + nkwargs])
- step = 0
nargs += nkwargs
varargs = None
check_defaults = ()
check_kw = set(messages)
- has_kw = spec.varkw is not None
-
@decorator
def warned(fn, *args, **kwargs):
for m in check_defaults:
nargs = co.co_argcount
return (
list(co.co_varnames[:nargs]),
- bool(co.co_flags & inspect.CO_VARKEYWORDS),
+ bool(co.co_flags & co_varkeywords),
)
show-source = true
enable-extensions = G
# E203 is due to https://github.com/PyCQA/pycodestyle/issues/373
-# F841 is "variable assigned but never used". lots of these are in
-# code examples, as well as in tests where sometimes they serve to keep
-# an object in scope. It would be nice to have less of these but at the
-# same time some are needed and some make code more readable
ignore =
A003,
D,
E203,E305,E711,E712,E721,E722,E741,
- F821,F841
+ F821
N801,N802,N806,
RST304,RST303,RST299,RST399,
W503,W504
Column("a_id", ForeignKey("a.id")),
)
m1 = mapper(A, a, properties={"bs": relationship(B)})
- m2 = mapper(B, b)
+ mapper(B, b)
@profile_memory()
def go():
def insp_somesubfoo(subject):
return 2
- somefoo = SomeFoo()
+ SomeFoo()
eq_(inspect(SomeFoo()), 1)
eq_(inspect(SomeSubFoo()), 2)
for id_ in (25, 24, 23, 14, 12, 19, 18, 17, 16, 15):
assert id_ in lru
- i1 = lru[25]
+ lru[25]
i2 = item(25)
lru[25] = i2
assert 25 in lru
# default
s = util.pickle.dumps(sym1)
- sym3 = util.pickle.loads(s)
+ util.pickle.loads(s)
for protocol in 0, 1, 2:
print(protocol)
table_name = "testtbl"
constraint_name = "constraint"
constraint = CheckConstraint("data IS NOT NULL", name=constraint_name)
- tbl = Table(table_name, m, Column("data", String(255)), constraint)
+ Table(table_name, m, Column("data", String(255)), constraint)
dialect = mysql.dialect()
self.assert_compile(
schema.DropConstraint(constraint),
- "ALTER TABLE %s DROP CHECK `%s`"
- % (table_name, constraint_name),
- dialect=dialect
+ "ALTER TABLE %s DROP CHECK `%s`" % (table_name, constraint_name),
+ dialect=dialect,
)
def test_drop_constraint_mariadb(self):
table_name = "testtbl"
constraint_name = "constraint"
constraint = CheckConstraint("data IS NOT NULL", name=constraint_name)
- tbl = Table(table_name, m, Column("data", String(255)), constraint)
+ Table(table_name, m, Column("data", String(255)), constraint)
dialect = mysql.dialect()
dialect.server_version_info = (10, 1, 1, "MariaDB")
self.assert_compile(
schema.DropConstraint(constraint),
"ALTER TABLE %s DROP CONSTRAINT `%s`"
% (table_name, constraint_name),
- dialect=dialect
+ dialect=dialect,
)
def test_create_index_with_length_quoted(self):
"sqla_testing", "postgres_test_db_link"
)
- testtable = Table(
+ Table(
"testtable",
metadata,
Column("id", Integer, primary_key=True),
def test_get_foreign_table_names(self):
inspector = inspect(testing.db)
- with testing.db.connect() as conn:
+ with testing.db.connect():
ft_names = inspector.get_foreign_table_names()
eq_(ft_names, ["test_foreigntable"])
def test_get_table_names_no_foreign(self):
inspector = inspect(testing.db)
- with testing.db.connect() as conn:
+ with testing.db.connect():
names = inspector.get_table_names()
eq_(names, ["testtable"])
@testing.provide_metadata
def test_renamed_sequence_reflection(self):
metadata = self.metadata
- t = Table("t", metadata, Column("id", Integer, primary_key=True))
+ Table("t", metadata, Column("id", Integer, primary_key=True))
metadata.create_all()
m2 = MetaData(testing.db)
t2 = Table("t", m2, autoload=True, implicit_returning=False)
@testing.provide_metadata
def test_altered_type_autoincrement_pk_reflection(self):
metadata = self.metadata
- t = Table(
+ Table(
"t",
metadata,
Column("id", Integer, primary_key=True),
@testing.provide_metadata
def test_renamed_pk_reflection(self):
metadata = self.metadata
- t = Table("t", metadata, Column("id", Integer, primary_key=True))
+ Table("t", metadata, Column("id", Integer, primary_key=True))
metadata.create_all()
testing.db.connect().execution_options(autocommit=True).execute(
"alter table t rename id to t_id"
m1 = MetaData(conn)
- t1_schema = Table(
- "some_table", m1, schema="test_schema", autoload=True
- )
+ Table("some_table", m1, schema="test_schema", autoload=True)
t2_schema = Table(
"some_other_table", m1, schema="test_schema_2", autoload=True
)
metadata = self.metadata
- t1 = Table(
+ Table(
"party",
metadata,
Column("id", String(10), nullable=False),
metadata = self.metadata
- t1 = Table(
+ Table(
"t",
metadata,
Column("id", Integer, primary_key=True),
@classmethod
def define_tables(cls, metadata):
- data_table = Table(
+ Table(
"data_table",
metadata,
Column("id", Integer, primary_key=True),
metadata = self.metadata
e1 = Enum("one", "two", "three", name="myenum")
- t1 = Table("e1", metadata, Column("c1", e1))
+ Table("e1", metadata, Column("c1", e1))
- t2 = Table("e2", metadata, Column("c1", e1))
+ Table("e2", metadata, Column("c1", e1))
metadata.create_all(checkfirst=False)
metadata.drop_all(checkfirst=False)
"""
metadata = self.metadata
- e1 = Enum("one", "two", "three", name="myenum", metadata=self.metadata)
+ Enum("one", "two", "three", name="myenum", metadata=self.metadata)
metadata.create_all(checkfirst=False)
assert "myenum" in [e["name"] for e in inspect(testing.db).get_enums()]
etype = Enum(
"four", "five", "six", name="fourfivesixtype", metadata=metadata
)
- t1 = Table(
+ Table(
"table",
metadata,
Column("id", Integer, primary_key=True),
Column("ff", Float(asdecimal=False), default=1),
)
metadata.create_all()
- r = t.insert().execute()
+ t.insert().execute()
row = t.select().execute().first()
assert isinstance(row[1], decimal.Decimal)
def setup(self):
self.conn = testing.db.connect()
- trans = self.conn.begin()
+ self.conn.begin()
def teardown(self):
self.conn.close()
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.schema import DDL
from sqlalchemy.schema import DropConstraint
+from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
assert "fnord" in strings
def test_conditional_constraint(self):
- metadata, users, engine = self.metadata, self.users, self.engine
+ metadata, users = self.metadata, self.users
nonpg_mock = engines.mock_engine(dialect_name="sqlite")
pg_mock = engines.mock_engine(dialect_name="postgresql")
constraint = CheckConstraint(
@testing.uses_deprecated(r".*use the DDLEvents")
def test_conditional_constraint_deprecated(self):
- metadata, users, engine = self.metadata, self.users, self.engine
+ metadata, users = self.metadata, self.users
nonpg_mock = engines.mock_engine(dialect_name="sqlite")
pg_mock = engines.mock_engine(dialect_name="postgresql")
constraint = CheckConstraint(
table = self.users
ddl = DDL("SELECT 1")
- for py in (
- "engine.execute(ddl)",
- "engine.execute(ddl, table)",
- "cx.execute(ddl)",
- "cx.execute(ddl, table)",
- "ddl.execute(engine)",
- "ddl.execute(engine, table)",
- "ddl.execute(cx)",
- "ddl.execute(cx, table)",
+ for spec in (
+ (engine.execute, ddl),
+ (engine.execute, ddl, table),
+ (cx.execute, ddl),
+ (cx.execute, ddl, table),
+ (ddl.execute, engine),
+ (ddl.execute, engine, table),
+ (ddl.execute, cx),
+ (ddl.execute, cx, table),
):
- r = eval(py)
- assert list(r) == [(1,)], py
+ fn = spec[0]
+ arg = spec[1:]
+ r = fn(*arg)
+ eq_(list(r), [(1,)])
- for py in ("ddl.execute()", "ddl.execute(target=table)"):
- try:
- r = eval(py)
- assert False
- except tsa.exc.UnboundExecutionError:
- pass
+ for fn, kw in ((ddl.execute, {}), (ddl.execute, dict(target=table))):
+ assert_raises(tsa.exc.UnboundExecutionError, fn, **kw)
for bind in engine, cx:
ddl.bind = bind
- for py in ("ddl.execute()", "ddl.execute(target=table)"):
- r = eval(py)
- assert list(r) == [(1,)], py
+ for fn, kw in (
+ (ddl.execute, {}),
+ (ddl.execute, dict(target=table)),
+ ):
+ r = fn(**kw)
+ eq_(list(r), [(1,)])
def test_platform_escape(self):
"""test the escaping of % characters in the DDL construct."""
r1 = tlengine.execute(select([1]))
r2 = tlengine.execute(select([1]))
- row1 = r1.fetchone()
- row2 = r2.fetchone()
+ r1.fetchone()
+ r2.fetchone()
r1.close()
assert r2.connection is r1.connection
assert not r2.connection.closed
def test_dispose(self):
with _tlengine_deprecated():
eng = testing_engine(options=dict(strategy="threadlocal"))
- result = eng.execute(select([1]))
+ eng.execute(select([1]))
eng.dispose()
eng.execute(select([1]))
Engine._has_events = False
def _assert_stmts(self, expected, received):
- orig = list(received)
+ list(received)
for stmt, params, posn in expected:
if not received:
assert False, "Nothing available for stmt: %s" % stmt
r"is.*(?:i_dont_exist|does not exist)",
py2konly=True,
):
- with patch.object(conn.dialect, "do_rollback", boom) as patched:
+ with patch.object(conn.dialect, "do_rollback", boom):
assert_raises_message(
tsa.exc.OperationalError,
"rollback failed",
connection, and additionally dispose the previous connection
pool and recreate."""
- db_pool = self.db.pool
-
# make a connection
conn = self.db.connect()
dbapi = self.dbapi
- mock_dialect = Mock()
-
class MyURL(URL):
def _get_entrypoint(self):
return Dialect
initialize = Mock()
engine = create_engine(MyURL("foo://"), module=dbapi)
- c1 = engine.connect()
+ engine.connect()
engine.dispose()
- c2 = engine.connect()
+ engine.connect()
eq_(Dialect.initialize.call_count, 1)
def test_invalidate_conn_w_contextmanager_interrupt(self):
eq_(c1.execute(select([1])).scalar(), 1)
- p1 = self.engine.pool
self.engine.test_shutdown()
_assert_invalidated(c1.execute, select([1]))
@testing.requires.savepoints
def test_rollback_on_invalid_savepoint(self):
conn = self.engine.connect()
- trans = conn.begin()
+ conn.begin()
trans2 = conn.begin_nested()
conn.invalidate()
trans2.rollback()
engine.dialect.initialize = broken_initialize
- p1 = engine.pool
-
def is_disconnect(e, conn, cursor):
return True
@testing.requires.implicit_default_schema
@testing.provide_metadata
def test_reflect_all_schemas_default_overlap(self):
- t1 = Table("t", self.metadata, Column("id", Integer, primary_key=True))
+ Table("t", self.metadata, Column("id", Integer, primary_key=True))
- t2 = Table(
+ Table(
"t",
self.metadata,
Column("id1", sa.ForeignKey("t.id")),
trans2.rollback()
raise
transaction.rollback()
- except Exception as e:
+ except Exception:
transaction.rollback()
raise
except Exception as e:
connection.execute(users.insert(), user_id=2, user_name="user2")
try:
connection.execute(users.insert(), user_id=2, user_name="user2.5")
- except Exception as e:
+ except Exception:
trans.__exit__(*sys.exc_info())
assert not trans.is_active
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
- trans2 = connection.begin_nested()
+ connection.begin_nested()
connection.execute(users.insert(), user_id=2, user_name="user2")
trans3 = connection.begin()
connection.execute(users.insert(), user_id=3, user_name="user3")
with eng.connect() as conn:
rec = conn.connection._connection_record
raw_dbapi_con = rec.connection
- xa = conn.begin_twophase()
+ conn.begin_twophase()
conn.execute(users.insert(), user_id=1, user_name="user1")
assert rec.connection is raw_dbapi_con
# this is the failure mode, where B is being handled by
# declarative and is in the registry but not mapped yet.
result[0] = oe
- except exc.InvalidRequestError as err:
+ except exc.InvalidRequestError:
# if make_b() starts too slowly, we can reach here, because
# B isn't in the registry yet. We can't guard against this
# case in the library because a class can refer to a name that
event.listen(WeatherLocation, "load", load)
sess = self._fixture_data()
- tokyo = (
+ tokyo = ( # noqa
sess.query(WeatherLocation)
.filter_by(city="Tokyo")
.set_shard("asia")
polymorphic_on=employee_join.c.type,
polymorphic_identity="person",
)
- engineer_mapper = mapper(
+ mapper(
Engineer,
engineers,
inherits=person_mapper,
polymorphic_identity="engineer",
)
- manager_mapper = mapper(
+ mapper(
Manager,
managers,
inherits=person_mapper,
polymorphic_identity="manager",
)
- car_mapper = mapper(
- Car, cars, properties={"employee": relationship(person_mapper)}
- )
+ mapper(Car, cars, properties={"employee": relationship(person_mapper)})
session = create_session()
polymorphic_on=people.c.type,
polymorphic_identity="person",
)
- engineer_mapper = mapper(
+ mapper(
Engineer,
engineers,
inherits=person_mapper,
inherits=person_mapper,
polymorphic_identity="manager",
)
- car_mapper = mapper(
+ mapper(
Car,
cars,
properties={
polymorphic_on=car_join.c.type,
polymorphic_identity="car",
)
- offroad_car_mapper = mapper(
+ mapper(
Offraod_Car,
offroad_cars,
inherits=car_mapper,
polymorphic_identity="person",
properties={"car": relationship(car_mapper)},
)
- engineer_mapper = mapper(
+ mapper(
Engineer,
engineers,
inherits=person_mapper,
polymorphic_identity="engineer",
)
- manager_mapper = mapper(
+ mapper(
Manager,
managers,
inherits=person_mapper,
)
session = create_session()
- basic_car = Car(name="basic")
- offroad_car = Offraod_Car(name="offroad")
for i in range(1, 4):
if i % 2:
inherits=person_mapper,
polymorphic_identity="engineer",
)
- manager_mapper = mapper(
+ mapper(
Manager,
managers,
inherits=person_mapper,
polymorphic_identity="manager",
)
- car_mapper = mapper(
+ mapper(
Car,
cars,
properties={
# this particular adapt used to cause a recursion overflow;
# added here for testing
- e = exists([Car.owner], Car.owner == employee_join.c.person_id)
Query(Person)._adapt_clause(employee_join, False, False)
r = (
with_polymorphic=("*", pu_Engineer),
)
- mapper_Manager = mapper(
+ mapper(
Manager,
table_Manager,
inherit_condition=table_Manager.c.id == table_Engineer.c.id,
class MissingPolymorphicOnTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- tablea = Table(
+ Table(
"tablea",
metadata,
Column(
),
Column("adata", String(50)),
)
- tableb = Table(
+ Table(
"tableb",
metadata,
Column(
Column("aid", Integer, ForeignKey("tablea.id")),
Column("data", String(50)),
)
- tablec = Table(
+ Table(
"tablec",
metadata,
Column("id", Integer, ForeignKey("tablea.id"), primary_key=True),
Column("cdata", String(50)),
)
- tabled = Table(
+ Table(
"tabled",
metadata,
Column("id", Integer, ForeignKey("tablec.id"), primary_key=True),
pass
def _roundtrip(self):
- Person, User = self.classes.Person, self.classes.User
+ User = self.classes.User
sess = Session()
u1 = User()
u2 = User()
"end though",
)
def test_join_w_eager_w_any(self):
- A, B, C, D, E = (
- self.classes.A,
- self.classes.B,
- self.classes.C,
- self.classes.D,
- self.classes.E,
- )
+ B, C, D = (self.classes.B, self.classes.C, self.classes.D)
s = Session(testing.db)
b = B(ds=[D()])
class PolymorphicOnNotLocalTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- t1 = Table(
+ Table(
"t1",
metadata,
Column(
Column("x", String(10)),
Column("q", String(10)),
)
- t2 = Table(
+ Table(
"t2",
metadata,
Column(
)
def test_polymorphic_on_non_expr_prop(self):
- t2, t1 = self.tables.t2, self.tables.t1
+ t2 = self.tables.t2
Parent = self.classes.Parent
- t1t2_join = select([t1.c.x], from_obj=[t1.join(t2)]).alias()
-
- def go():
- interface_m = mapper(
- Parent, t2, polymorphic_on=lambda: "hi", polymorphic_identity=0
- )
-
assert_raises_message(
sa_exc.ArgumentError,
"Only direct column-mapped property or "
"SQL expression can be passed for polymorphic_on",
- go,
+ mapper,
+ Parent,
+ t2,
+ polymorphic_on=lambda: "hi",
+ polymorphic_identity=0,
)
def test_polymorphic_on_not_present_col(self):
def go():
t1t2_join_2 = select([t1.c.q], from_obj=[t1.join(t2)]).alias()
- interface_m = mapper(
+ mapper(
Parent,
t2,
polymorphic_on=t1t2_join.c.x,
# if with_polymorphic, but its not present, not OK
def go():
t1t2_join_2 = select([t1.c.q], from_obj=[t1.join(t2)]).alias()
- interface_m = mapper(
+ mapper(
Parent,
t2,
polymorphic_on=t1t2_join.c.x,
def test_polymorphic_on_expr_implicit_map_no_label_single(self):
"""test that single_table_criterion is propagated
with a standalone expr"""
- t2, t1 = self.tables.t2, self.tables.t1
+ t1 = self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
def test_polymorphic_on_expr_implicit_map_w_label_single(self):
"""test that single_table_criterion is propagated
with a standalone expr"""
- t2, t1 = self.tables.t2, self.tables.t1
+ t1 = self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
expr = case(
[(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]
self._roundtrip(parent_ident="p", child_ident="c")
def test_polymorphic_on_synonym(self):
- t2, t1 = self.tables.t2, self.tables.t1
- Parent, Child = self.classes.Parent, self.classes.Child
+ t1 = self.tables.t1
+ Parent = self.classes.Parent
cprop = column_property(t1.c.x)
assert_raises_message(
sa_exc.ArgumentError,
class Admin(User):
pass
- role_mapper = mapper(Role, roles)
+ mapper(Role, roles)
user_mapper = mapper(
User,
users,
)
},
)
- admin_mapper = mapper(Admin, admins, inherits=user_mapper)
+ mapper(Admin, admins, inherits=user_mapper)
sess = create_session()
adminrole = Role()
sess.add(adminrole)
class Admin(User):
pass
- role_mapper = mapper(Role, roles)
+ mapper(Role, roles)
user_mapper = mapper(
User,
users,
},
)
- admin_mapper = mapper(Admin, admins, inherits=user_mapper)
+ mapper(Admin, admins, inherits=user_mapper)
# create roles
adminrole = Role("admin")
Column("id", Integer, ForeignKey("base.id"), primary_key=True),
Column("order_id", Integer, ForeignKey("order.foo")),
)
- order_table = Table(
- "order", m, Column("id", Integer, primary_key=True)
- )
+ Table("order", m, Column("id", Integer, primary_key=True))
class Base(object):
pass
class PKDiscriminatorTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- parents = Table(
+ Table(
"parents",
metadata,
Column(
Column("name", String(60)),
)
- children = Table(
+ Table(
"children",
metadata,
Column("id", Integer, ForeignKey("parents.id"), primary_key=True),
class NameConflictTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- content = Table(
+ Table(
"content",
metadata,
Column(
),
Column("type", String(30)),
)
- foo = Table(
+ Table(
"foo",
metadata,
Column("id", Integer, ForeignKey("content.id"), primary_key=True),
"pjoin",
)
employee_mapper = mapper(Employee, pjoin, polymorphic_on=pjoin.c.type)
- manager_mapper = mapper(
+ mapper(
Manager,
managers_table,
inherits=employee_mapper,
concrete=True,
polymorphic_identity="manager",
)
- engineer_mapper = mapper(
+ mapper(
Engineer,
engineers_table,
inherits=employee_mapper,
"pjoin2",
)
employee_mapper = mapper(Employee, pjoin, polymorphic_on=pjoin.c.type)
- manager_mapper = mapper(
+ mapper(
Manager,
managers_table,
inherits=employee_mapper,
concrete=True,
polymorphic_identity="engineer",
)
- hacker_mapper = mapper(
+ mapper(
Hacker,
hackers_table,
inherits=engineer_mapper,
with_polymorphic=("*", pjoin),
polymorphic_on=pjoin.c.type,
)
- manager_mapper = mapper(
+ mapper(
Manager,
managers_table,
inherits=employee_mapper,
concrete=True,
polymorphic_identity="engineer",
)
- hacker_mapper = mapper(
+ mapper(
Hacker,
hackers_table,
inherits=engineer_mapper,
employee_mapper = mapper(
Employee, employees_table, polymorphic_identity="employee"
)
- manager_mapper = mapper(
+ mapper(
Manager,
managers_table,
inherits=employee_mapper,
concrete=True,
polymorphic_identity="engineer",
)
- hacker_mapper = mapper(
+ mapper(
Hacker,
hackers_table,
inherits=engineer_mapper,
properties={"employees": relationship(Employee)},
)
employee_mapper = mapper(Employee, pjoin, polymorphic_on=pjoin.c.type)
- manager_mapper = mapper(
+ mapper(
Manager,
managers_table,
inherits=employee_mapper,
concrete=True,
polymorphic_identity="manager",
)
- engineer_mapper = mapper(
+ mapper(
Engineer,
engineers_table,
inherits=employee_mapper,
polymorphic_on=pjoin.c.type,
polymorphic_identity="location",
)
- office_mapper = mapper(
+ mapper(
Office,
offices_table,
inherits=location_mapper,
concrete=True,
polymorphic_identity="office",
)
- refugee_mapper = mapper(
+ mapper(
Refugee,
refugees_table,
inherits=location_mapper,
def _generate_round_trip_test(use_unions=False, use_joins=False):
def test_roundtrip(self):
- publication_mapper = mapper(Publication, self.tables.publication)
+ mapper(Publication, self.tables.publication)
- issue_mapper = mapper(
+ mapper(
Issue,
self.tables.issue,
properties={
},
)
- location_name_mapper = mapper(LocationName, self.tables.location_name)
+ mapper(LocationName, self.tables.location_name)
- location_mapper = mapper(
+ mapper(
Location,
self.tables.location,
properties={
},
)
- page_size_mapper = mapper(PageSize, self.tables.page_size)
+ mapper(PageSize, self.tables.page_size)
- magazine_mapper = mapper(
+ mapper(
Magazine,
self.tables.magazine,
properties={
},
)
- classified_page_mapper = mapper(
+ mapper(
ClassifiedPage,
self.tables.classified_page,
inherits=magazine_page_mapper,
},
)
- table1b_mapper = mapper(
- Table1B, inherits=table1_mapper, polymorphic_identity="table1b"
- )
+ mapper(Table1B, inherits=table1_mapper, polymorphic_identity="table1b")
- table2_mapper = mapper(
+ mapper(
Table2,
table2,
inherits=table1_mapper,
polymorphic_identity="table2",
)
- table3_mapper = mapper(
+ mapper(
Table3,
table3,
inherits=table1_mapper,
),
]
- pointy = employees[0]
- jsmith = employees[-1]
dilbert = employees[1]
session = create_session()
def go():
# assert that only primary table is queried for
# already-present-in-session
- d = (
+ (
session.query(Person)
.filter(getattr(Person, person_attribute_name) == "dilbert")
.first()
polymorphic_identity="product",
)
- detail_mapper = mapper(
- Detail, inherits=product_mapper, polymorphic_identity="detail"
- )
+ mapper(Detail, inherits=product_mapper, polymorphic_identity="detail")
- assembly_mapper = mapper(
+ mapper(
Assembly, inherits=product_mapper, polymorphic_identity="assembly"
)
- specification_mapper = mapper(
+ mapper(
SpecLine,
specification_table,
properties=dict(
polymorphic_identity="product",
)
- detail_mapper = mapper(
- Detail, inherits=product_mapper, polymorphic_identity="detail"
- )
+ mapper(Detail, inherits=product_mapper, polymorphic_identity="detail")
- specification_mapper = mapper(
+ mapper(
SpecLine,
specification_table,
properties=dict(
polymorphic_on=products_table.c.product_type,
polymorphic_identity="product",
)
- detail_mapper = mapper(
- Detail, inherits=product_mapper, polymorphic_identity="detail"
- )
- assembly_mapper = mapper(
+ mapper(Detail, inherits=product_mapper, polymorphic_identity="detail")
+ mapper(
Assembly, inherits=product_mapper, polymorphic_identity="assembly"
)
- specification_mapper = mapper(
+ mapper(
SpecLine,
specification_table,
properties=dict(
),
),
)
- raster_document_mapper = mapper(
+ mapper(
RasterDocument,
inherits=document_mapper,
polymorphic_identity="raster_document",
polymorphic_on=products_table.c.product_type,
polymorphic_identity="product",
)
- detail_mapper = mapper(
- Detail, inherits=product_mapper, polymorphic_identity="detail"
- )
- assembly_mapper = mapper(
+ mapper(Detail, inherits=product_mapper, polymorphic_identity="detail")
+ mapper(
Assembly, inherits=product_mapper, polymorphic_identity="assembly"
)
),
),
)
- raster_document_mapper = mapper(
+ mapper(
RasterDocument,
inherits=document_mapper,
polymorphic_identity="raster_document",
def test_five(self):
"""tests the late compilation of mappers"""
- specification_mapper = mapper(
+ mapper(
SpecLine,
specification_table,
properties=dict(
),
)
- product_mapper = mapper(
+ mapper(
Product,
products_table,
polymorphic_on=products_table.c.product_type,
},
)
- detail_mapper = mapper(
- Detail, inherits=Product, polymorphic_identity="detail"
- )
+ mapper(Detail, inherits=Product, polymorphic_identity="detail")
- document_mapper = mapper(
+ mapper(
Document,
documents_table,
polymorphic_on=documents_table.c.document_type,
),
)
- raster_document_mapper = mapper(
+ mapper(
RasterDocument,
inherits=Document,
polymorphic_identity="raster_document",
)
- assembly_mapper = mapper(
- Assembly, inherits=Product, polymorphic_identity="assembly"
- )
+ mapper(Assembly, inherits=Product, polymorphic_identity="assembly")
session = create_session()
@classmethod
def define_tables(cls, metadata):
- people = Table(
+ Table(
"people",
metadata,
Column(
Column("type", String(30)),
)
- engineers = Table(
+ Table(
"engineers",
metadata,
Column(
Column("reports_to_id", Integer, ForeignKey("managers.person_id")),
)
- managers = Table(
+ Table(
"managers",
metadata,
Column(
@classmethod
def define_tables(cls, metadata):
- people = Table(
+ Table(
"people",
metadata,
Column(
Column("type", String(30)),
)
- engineers = Table(
+ Table(
"engineers",
metadata,
Column(
@classmethod
def define_tables(cls, metadata):
- organizations = Table(
+ Table(
"organizations",
metadata,
Column(
Column("name", String(50)),
)
- engineers_to_org = Table(
+ Table(
"engineers_to_org",
metadata,
Column("org_id", Integer, ForeignKey("organizations.id")),
Column("engineer_id", Integer, ForeignKey("engineers.person_id")),
)
- people = Table(
+ Table(
"people",
metadata,
Column(
Column("type", String(30)),
)
- engineers = Table(
+ Table(
"engineers",
metadata,
Column(
)
def test_joinedload(self):
- Root, Intermediate, Sub1, Target = (
+ Root, Intermediate, Sub1 = (
self.classes.Root,
self.classes.Intermediate,
self.classes.Sub1,
- self.classes.Target,
)
sess = Session()
mapper(D, d, inherits=A)
def _two_join_fixture(self):
- A, B, C, D = (
- self.classes.A,
- self.classes.B,
- self.classes.C,
- self.classes.D,
- )
+ B, C, D = (self.classes.B, self.classes.C, self.classes.D)
s = Session()
return (
s.query(B.name, C.name, D.name)
)
def test_two_joins_adaption(self):
- a, b, c, d = self.tables.a, self.tables.b, self.tables.c, self.tables.d
+ a, c, d = self.tables.a, self.tables.c, self.tables.d
q = self._two_join_fixture()
btoc = q._from_obj[0].left
)
def _fixture_one(self):
- Employee, JuniorEngineer, Manager, Engineer = (
- self.classes.Employee,
+ JuniorEngineer, Manager, Engineer = (
self.classes.JuniorEngineer,
self.classes.Manager,
self.classes.Engineer,
assert row.employee_id == e1.employee_id
def test_multi_qualification(self):
- JuniorEngineer, Manager, Engineer = (
- self.classes.JuniorEngineer,
- self.classes.Manager,
- self.classes.Engineer,
- )
+ Manager, Engineer = (self.classes.Manager, self.classes.Engineer)
session, m1, e1, e2 = self._fixture_one()
def test_assert_joinedload_sql(self):
Parent = self.classes.Parent
- Child = self.classes.Child
s = Session()
session.add(p)
session.flush()
session.expunge_all()
- obj = session.query(Left).filter_by(data="l1").one()
+ session.query(Left).filter_by(data="l1").one()
class EagerTest3(fixtures.MappedTest):
self.tables.derivedII,
)
- commentMapper = mapper(Comment, comments)
+ mapper(Comment, comments)
baseMapper = mapper(
Base,
pass
def test_nested_joins(self):
- task, Task_Type, Joined, prj, task_type, msg = (
+ task, Task_Type, Joined, task_type, msg = (
self.tables.task,
self.classes.Task_Type,
self.classes.Joined,
- self.tables.prj,
self.tables.task_type,
self.tables.msg,
)
mapper(Task_Type, task_type)
- tsk_cnt_join = sa.outerjoin(prj, task, task.c.prj_id == prj.c.id)
-
j = sa.outerjoin(task, msg, task.c.id == msg.c.task_id)
jj = sa.select(
[
tx2 = Transaction(name="tx2")
acc1 = Account(name="acc1")
- ent11 = Entry(name="ent11", account=acc1, transaction=tx1)
- ent12 = Entry(name="ent12", account=acc1, transaction=tx2)
+ Entry(name="ent11", account=acc1, transaction=tx1)
+ Entry(name="ent12", account=acc1, transaction=tx2)
acc2 = Account(name="acc2")
- ent21 = Entry(name="ent21", account=acc2, transaction=tx1)
- ent22 = Entry(name="ent22", account=acc2, transaction=tx2)
+ Entry(name="ent21", account=acc2, transaction=tx1)
+ Entry(name="ent22", account=acc2, transaction=tx2)
session.add(acc1)
session.flush()
try:
Foo().collection
assert True
- except sa_exc.ArgumentError as e:
+ except sa_exc.ArgumentError:
assert False
def test_last_known_tracking(self):
)
hi = Bar(name="hi")
there = Bar(name="there")
- old = Bar(name="old")
- new = Bar(name="new")
f = Foo()
eq_(
attributes.get_state_history(
fn()
if useobject:
- D = make_d()
+ make_d()
instrument_d()
yield classes, canary
)
def test_delete_orphan_without_delete(self):
- User, Address = self.classes.User, self.classes.Address
- users, addresses = self.tables.users, self.tables.addresses
+ Address = self.classes.Address
assert_raises_message(
sa_exc.SAWarning,
)
def test_cascade_unicode(self):
- User, Address = self.classes.User, self.classes.Address
- users, addresses = self.tables.users, self.tables.addresses
+ Address = self.classes.Address
rel = relationship(Address)
rel.cascade = util.u("save-update, merge, expunge")
)
def test_none_o2m_collection_assignment(self):
- User, Address = self.classes.User, self.classes.Address
+ User = self.classes.User
s = Session()
u1 = User(name="u", addresses=[None])
s.add(u1)
eq_(u1.addresses, [None])
def test_none_o2m_collection_append(self):
- User, Address = self.classes.User, self.classes.Address
+ User = self.classes.User
s = Session()
u1 = User(name="u")
def test_single_parent_raise(self):
T2, T1 = self.classes.T2, self.classes.T1
- sess = create_session()
-
y = T2(data="T2a")
- x = T1(data="T1a", t2=y)
+ T1(data="T1a", t2=y)
assert_raises(sa_exc.InvalidRequestError, T1, data="T1b", t2=y)
def test_single_parent_backref(self):
T2, T3 = self.classes.T2, self.classes.T3
- sess = create_session()
-
y = T3(data="T3a")
x = T2(data="T2a", t3=y)
)
mapper(B, b)
- sess = create_session()
b1 = B(data="b1")
- a1 = A(data="a1", bs=[b1])
+ A(data="a1", bs=[b1])
assert_raises(sa_exc.InvalidRequestError, A, data="a2", bs=[b1])
)
mapper(B, b)
- sess = create_session()
b1 = B(data="b1")
a1 = A(data="a1", bs=[b1])
)
c1 = Core()
if detach_event:
- r1 = RelatedOne(cores=[c1])
- r2 = RelatedTwo(cores=[c1])
+ RelatedOne(cores=[c1])
+ RelatedTwo(cores=[c1])
else:
if r1_present:
- r1 = RelatedOne(cores=[c1])
+ RelatedOne(cores=[c1])
if r2_present:
- r2 = RelatedTwo(cores=[c1])
+ RelatedTwo(cores=[c1])
if persistent:
s = Session()
order_join = order.select().alias("pjoin")
- order_mapper = mapper(
+ mapper(
Order,
order,
with_polymorphic=("*", order_join),
order_join = order.select().alias("pjoin")
- order_mapper = mapper(
+ mapper(
Order,
order,
with_polymorphic=("*", order_join),
class Host(object):
pass
- node_mapper = mapper(Node, node_table)
- host_mapper = mapper(Host, host_table)
- node_name_mapper = mapper(
+ mapper(Node, node_table)
+ mapper(Host, host_table)
+ mapper(
NodeName,
node_name_table,
properties={
eq_(e.end, Point(18, 4))
def test_not_none(self):
- Graph, Edge, Point = (
- self.classes.Graph,
- self.classes.Edge,
- self.classes.Point,
- )
+ Edge = self.classes.Edge
# current contract. the composite is None
# when hasn't been populated etc. on a
class ExcludedDefaultsTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- dt = Table(
+ Table(
"dt",
metadata,
Column(
def go():
result = q.all()
o2 = result[2]
- x = o2.description
+ o2.description
self.sql_eq_(
go,
def test_save(self):
Order, orders = self.classes.Order, self.tables.orders
- m = mapper(
+ mapper(
Order,
orders,
properties={"description": deferred(orders.c.description)},
"the Session._enable_transaction_accounting parameter is "
"deprecated"
):
- s = Session(_enable_transaction_accounting=False)
+ Session(_enable_transaction_accounting=False)
def test_session_is_modified(self):
class Foo(self.DeclarativeBasic):
)
def test_info(self):
- users = self.tables.users
- Address = self.classes.Address
-
class MyComposite(object):
pass
self.sql_count_(1, go)
def test_kwarg_accepted(self):
- users, Address = self.tables.users, self.classes.Address
-
class DummyComposite(object):
def __init__(self, x, y):
pass
pass
instrumentation.register_class(Foo)
- d = attributes.register_attribute(
+ attributes.register_attribute(
Foo, "attr", uselist=True, typecallable=MyDict, useobject=True
)
class AddressUser(User):
pass
- m1 = mapper(User, users, polymorphic_identity="user")
+ mapper(User, users, polymorphic_identity="user")
m2 = mapper(
AddressUser,
addresses,
def test_splice_onto_ac(self):
A = self.classes.A
B = self.classes.B
- C1 = self.classes.C1
b_table = self.tables.b
c1_table = self.tables.c1
event.listen(m, "load", canary.listen5, insert=True)
event.listen(m, "load", canary.listen6, propagate=True, insert=True)
- u1 = User()
- state = u1._sa_instance_state
+ User()
m.dispatch.before_insert(arg, arg, arg)
m.class_manager.dispatch.load(arg, arg)
eq_(
s = Session()
with self.sql_execution_asserter() as asserter:
- q = s.query(User).filter_by(id=7).update({"name": "ed"})
+ s.query(User).filter_by(id=7).update({"name": "ed"})
asserter.assert_(
CompiledSQL(
"UPDATE users SET name=:name WHERE "
# note this deletes no rows
with self.sql_execution_asserter() as asserter:
- q = s.query(User).filter_by(id=10).delete()
+ s.query(User).filter_by(id=10).delete()
asserter.assert_(
CompiledSQL(
"DELETE FROM users WHERE "
s.expire_all()
def go():
- u = s.query(User).get(10) # get() refreshes
+ s.query(User).get(10) # get() refreshes
self.assert_sql_count(testing.db, go, 1)
self.assert_sql_count(testing.db, go, 0)
def go():
- u = s.query(User).get(10) # expire flag reset, so not expired
+ s.query(User).get(10) # expire flag reset, so not expired
self.assert_sql_count(testing.db, go, 0)
eq_(len(list(sess)), 4) # since addresses were gc'ed
userlist = sess.query(User).order_by(User.id).all()
- u = userlist[1]
eq_(self.static.user_address_result, userlist)
eq_(len(list(sess)), 9)
@classmethod
def define_tables(cls, metadata):
- people = Table(
+ Table(
"people",
metadata,
Column(
Column("type", String(30)),
)
- engineers = Table(
+ Table(
"engineers",
metadata,
Column(
User = self.classes.User
s, u1, a1 = self._fixture()
- u2 = User(addresses=[a1])
+ u2 = User(addresses=[a1]) # noqa
s.expire(a1)
u1.addresses.remove(a1)
self._assert_hasparent(a1)
def test_fresh_state_positive(self):
- User = self.classes.User
s, u1, a1 = self._fixture()
self._assert_hasparent(a1)
def test_fresh_state_negative(self):
- User = self.classes.User
s, u1, a1 = self._fixture()
u1.addresses.remove(a1)
def __init__(self):
inits.append((A, "__init__"))
- obj = A()
+ A()
eq_(inits, [(A, "__init__")])
def test_A(self):
self.register(A, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A)])
def test_Ai(self):
self.register(A, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
def test_ai_B(self):
self.register(B, inits)
- obj = A()
+ A()
eq_(inits, [(A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (A, "__init__")])
def test_ai_Bi(self):
self.register(B, inits)
- obj = A()
+ A()
eq_(inits, [(A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (B, "__init__"), (A, "__init__")])
def test_Ai_bi(self):
inits.append((B, "__init__"))
super(B, self).__init__()
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "__init__"), (A, "init", B), (A, "__init__")])
def test_Ai_Bi(self):
self.register(B, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (B, "__init__"), (A, "__init__")])
def test_Ai_B(self):
self.register(B, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (A, "__init__")])
def test_Ai_Bi_Ci(self):
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (B, "__init__"), (A, "__init__")])
del inits[:]
- obj = C()
+ C()
eq_(
inits,
[
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "__init__"), (A, "init", B), (A, "__init__")])
del inits[:]
- obj = C()
+ C()
eq_(
inits,
[
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(A, "init", B), (A, "__init__")])
del inits[:]
- obj = C()
+ C()
eq_(inits, [(C, "init", C), (C, "__init__"), (A, "__init__")])
def test_Ai_B_Ci(self):
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (A, "__init__")])
del inits[:]
- obj = C()
+ C()
eq_(inits, [(C, "init", C), (C, "__init__"), (A, "__init__")])
def test_Ai_B_C(self):
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A), (A, "__init__")])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (A, "__init__")])
del inits[:]
- obj = C()
+ C()
eq_(inits, [(C, "init", C), (A, "__init__")])
def test_A_Bi_C(self):
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A)])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B), (B, "__init__")])
del inits[:]
- obj = C()
+ C()
eq_(inits, [(C, "init", C), (B, "__init__")])
def test_A_B_Ci(self):
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A)])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B)])
del inits[:]
- obj = C()
+ C()
eq_(inits, [(C, "init", C), (C, "__init__")])
def test_A_B_C(self):
self.register(C, inits)
- obj = A()
+ A()
eq_(inits, [(A, "init", A)])
del inits[:]
- obj = B()
+ B()
eq_(inits, [(B, "init", B)])
del inits[:]
- obj = C()
+ C()
eq_(inits, [(C, "init", C)])
def test_defaulted_init(self):
def __init__(self, x):
pass
- m = mapper(A, self.fixture())
+ mapper(A, self.fixture())
# B is not mapped in the current implementation
assert_raises(sa.orm.exc.UnmappedClassError, class_mapper, B)
a = A()
p_a = pickle.dumps(a)
- re_a = pickle.loads(p_a)
+ pickle.loads(p_a)
finally:
del A
def test_kw_only_args(self):
cls, canary = self._kw_only_fixture()
- a = cls("a", b="b", c="c")
+ cls("a", b="b", c="c")
eq_(canary, [(("a",), {"b": "b", "c": "c"})])
def test_kw_plus_posn_args(self):
cls, canary = self._kw_plus_posn_fixture()
- a = cls("a", 1, 2, 3, b="b", c="c")
+ cls("a", 1, 2, 3, b="b", c="c")
eq_(canary, [(("a", 1, 2, 3), {"b": "b", "c": "c"})])
def test_kw_only_args_plus_opt(self):
cls, canary = self._kw_opt_fixture()
- a = cls("a", b="b")
+ cls("a", b="b")
eq_(canary, [(("a",), {"b": "b", "c": "c"})])
canary[:] = []
- a = cls("a", b="b", c="d")
+ cls("a", b="b", c="d")
eq_(canary, [(("a",), {"b": "b", "c": "d"})])
def test_kw_only_sig(self):
)
def test_left_is_none_and_query_has_no_entities(self):
- User = self.classes.User
Address = self.classes.Address
sess = create_session()
)
def test_single_prop_1(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ User = self.classes.User
sess = create_session()
self.assert_compile(
)
def test_single_prop_2(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ Order, User = (self.classes.Order, self.classes.User)
sess = create_session()
self.assert_compile(
)
def test_single_prop_3(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ Order, User = (self.classes.Order, self.classes.User)
sess = create_session()
oalias1 = aliased(Order)
)
def test_single_prop_4(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ Order, User, = (self.classes.Order, self.classes.User)
sess = create_session()
oalias1 = aliased(Order)
)
def test_single_prop_5(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ Order, User, = (self.classes.Order, self.classes.User)
sess = create_session()
self.assert_compile(
)
def test_single_prop_6(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ User = self.classes.User
sess = create_session()
ualias = aliased(User)
)
def test_single_prop_7(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ Order, User = (self.classes.Order, self.classes.User)
sess = create_session()
# this query is somewhat nonsensical. the old system didn't render a
)
def test_single_prop_8(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ Order, User, = (self.classes.Order, self.classes.User)
sess = create_session()
# same as before using an aliased() for User as well
)
def test_single_prop_9(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ User = self.classes.User
sess = create_session()
self.assert_compile(
)
def test_single_prop_10(self):
- Item, Order, User, Address = (
- self.classes.Item,
- self.classes.Order,
- self.classes.User,
- self.classes.Address,
- )
+ User, Address = (self.classes.User, self.classes.Address)
sess = create_session()
self.assert_compile(
)
def test_single_prop_11(self):
- Item, Order, User, Address = (
+ Item, Order, User, = (
self.classes.Item,
self.classes.Order,
self.classes.User,
- self.classes.Address,
)
sess = create_session()
)
def test_single_prop_12(self):
- Item, Order, User, Address = (
- self.classes.Item,
+ Order, User, Address = (
self.classes.Order,
self.classes.User,
self.classes.Address,
)
def test_single_prop_13(self):
- Item, Order, User, Address = (
- self.classes.Item,
+ Order, User, Address = (
self.classes.Order,
self.classes.User,
self.classes.Address,
def test_clause_present_in_froms_twice_w_onclause(self):
# test [ticket:4584]
- Order, Address, Dingaling, User = (
+ Order, Address, User = (
self.classes.Order,
self.classes.Address,
- self.classes.Dingaling,
self.classes.User,
)
def test_clause_present_in_froms_twice_wo_onclause(self):
# test [ticket:4584]
- Order, Address, Dingaling, User = (
- self.classes.Order,
+ Address, Dingaling, User = (
self.classes.Address,
self.classes.Dingaling,
self.classes.User,
sess = create_session()
# no arg error
- result = (
+ (
sess.query(User)
.join("orders", aliased=True)
.order_by(Order.id)
class MultiplePathTest(fixtures.MappedTest, AssertsCompiledSQL):
@classmethod
def define_tables(cls, metadata):
- t1 = Table(
+ Table(
"t1",
metadata,
Column(
),
Column("data", String(30)),
)
- t2 = Table(
+ Table(
"t2",
metadata,
Column(
Column("data", String(30)),
)
- t1t2_1 = Table(
+ Table(
"t1t2_1",
metadata,
Column("t1id", Integer, ForeignKey("t1.id")),
Column("t2id", Integer, ForeignKey("t2.id")),
)
- t1t2_2 = Table(
+ Table(
"t1t2_2",
metadata,
Column("t1id", Integer, ForeignKey("t1.id")),
@classmethod
def define_tables(cls, metadata):
- nodes = Table(
+ Table(
"nodes",
metadata,
Column(
Column("parent_id", Integer, ForeignKey("nodes.id")),
)
- sub_table = Table(
+ Table(
"sub_table",
metadata,
Column(
Column("node_id", Integer, ForeignKey("nodes.id")),
)
- assoc_table = Table(
+ Table(
"assoc_table",
metadata,
Column("left_id", Integer, ForeignKey("nodes.id")),
def test_join_parent_child(self):
Parent = self.classes.Parent
- npc = self.npc
+
sess = Session()
self.assert_compile(
sess.query(Parent)
@classmethod
def define_tables(cls, metadata):
- nodes = Table(
+ Table(
"nodes",
metadata,
Column(
Column("data", String(30)),
)
- node_to_nodes = Table(
+ Table(
"node_to_nodes",
metadata,
Column(
assert m.configured is False
assert sa.orm.mapperlib.Mapper._new_mappers is True
- u = User()
+ User()
assert User.addresses
assert sa.orm.mapperlib.Mapper._new_mappers is False
assert not m.configured
configure_mappers()
- m2 = mapper(
+ mapper(
Address,
addresses,
properties={"user": relationship(User, backref="addresses")},
m1 = mapper(User, users)
User()
- m2 = mapper(
+ mapper(
Address,
addresses,
properties={"user": relationship(User, backref="addresses")},
inherits=User,
properties={"address_id": addresses.c.id},
)
- m3 = mapper(Address, addresses, properties={"foo": relationship(m2)})
+ mapper(Address, addresses, properties={"foo": relationship(m2)})
# add property using annotated User.name,
# needs to be deannotated
m.add_property("x", column_property(User.name + "name"))
class MyUser(User):
pass
- m1 = mapper(
+ mapper(
User,
users,
polymorphic_on=users.c.name,
super(MyFakeProperty, self).post_instrument_class(mapper)
configure_mappers()
- m1 = mapper(
- User, users, properties={"name": MyFakeProperty(users.c.name)}
- )
- m2 = mapper(Address, addresses)
+ mapper(User, users, properties={"name": MyFakeProperty(users.c.name)})
+ mapper(Address, addresses)
configure_mappers()
sa.orm.clear_mappers()
super(MyFakeProperty, self).post_instrument_class(mapper)
configure_mappers()
- m1 = mapper(
- User, users, properties={"name": MyFakeProperty(users.c.name)}
- )
- m2 = mapper(Address, addresses)
+ mapper(User, users, properties={"name": MyFakeProperty(users.c.name)})
+ mapper(Address, addresses)
configure_mappers()
def test_reconstructor(self):
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- human = (
+ human = ( # noqa
session.query(Human).options(sa.orm.joinedload("thing")).first()
- ) # noqa
+ )
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- human = (
+ human = ( # noqa
session.query(Human).options(sa.orm.joinedload("thing")).first()
- ) # noqa
+ )
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- result = (
+ result = ( # noqa
session.query(Human).add_entity(Thing).join("thing").first()
- ) # noqa
+ )
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- result = (
+ result = ( # noqa
session.query(Human).add_entity(Thing).join("thing").first()
- ) # noqa
+ )
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
)
mapper(User, users)
s = Session()
- u1 = s.query(User).first()
+ u1 = s.query(User).first() # noqa
a1 = (
s.query(Address)
.filter_by(id=1)
"Instance <User.*> is already pending in this Session yet is "
"being merged again; this is probably not what you want to do"
):
- u2 = sess.merge(u)
+ sess.merge(u)
def test_warn_transient_already_pending_pk(self):
User, users = self.classes.User, self.tables.users
"Instance <User.*> is already pending in this Session yet is "
"being merged again; this is probably not what you want to do"
):
- u2 = sess.merge(u)
+ sess.merge(u)
def test_transient_to_pending_collection(self):
User, Address, addresses, users = (
# attribute maintains modified state.
# (usually autoflush would have happened
# here anyway).
- u4 = sess.merge(User(id=2))
+ u4 = sess.merge(User(id=2)) # noqa
eq_(u3.__dict__["data"], "bar")
sess.flush()
# set it to None. this is actually
# a change so gets preserved.
u6.data = None
- u7 = sess.merge(User(id=3))
+ u7 = sess.merge(User(id=3)) # noqa
assert u6.__dict__["data"] is None
def test_merge_irregular_collection(self):
u2 = sess2.query(User).options(sa.orm.joinedload("addresses")).get(7)
sess3 = create_session()
- u3 = sess3.merge(u2, load=False)
+ u3 = sess3.merge(u2, load=False) # noqa
def go():
sess3.flush()
u2 = sess2.query(User).get(7)
sess3 = create_session()
- u3 = sess3.merge(u2, load=False)
+ u3 = sess3.merge(u2, load=False) # noqa
assert not sess3.dirty
def go():
)
sess = create_session(autoflush=True, autocommit=False)
- m = mapper(
+ mapper(
User,
users,
properties={
def go():
u1 = User(id=1, addresses=[Address(id=1), Address(id=2)])
- u2 = s.merge(u1)
+ s.merge(u1)
self.assert_sql_count(testing.db, go, 2)
@classmethod
def define_tables(cls, metadata):
- rocks_table = Table(
+ Table(
"rocks",
metadata,
Column("id", Integer, primary_key=True),
Column("description", String(10)),
)
- bugs_table = Table(
+ Table(
"bugs",
metadata,
Column("id", Integer, primary_key=True),
return [p.container for p in cls._fixture()]
def test_contains_eager_wpoly(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
self.assert_sql_count(testing.db, go, 5)
def test_joinedload_wpoly(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
self.assert_sql_count(testing.db, go, 5)
def test_joinedload_wsubclass(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, SubJob = (
self.classes.DataContainer,
- self.classes.Job,
self.classes.SubJob,
)
s = Session(testing.db)
self.assert_sql_count(testing.db, go, 11)
def test_subquery_wsubclass(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, SubJob = (
self.classes.DataContainer,
- self.classes.Job,
self.classes.SubJob,
)
s = Session(testing.db)
self.assert_sql_count(testing.db, go, 6)
def test_twolevel_subqueryload_wsubclass(self):
- ParentThing, DataContainer, Job, SubJob = (
+ ParentThing, DataContainer, SubJob = (
self.classes.ParentThing,
self.classes.DataContainer,
- self.classes.Job,
self.classes.SubJob,
)
s = Session(testing.db)
self.assert_sql_count(testing.db, go, 3)
def test_twolevel_joinedload_wsubclass(self):
- ParentThing, DataContainer, Job, SubJob = (
+ ParentThing, DataContainer, SubJob = (
self.classes.ParentThing,
self.classes.DataContainer,
- self.classes.Job,
self.classes.SubJob,
)
s = Session(testing.db)
self.assert_sql_count(testing.db, go, 5)
def test_any_wpoly(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
)
def test_any_walias(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
- self.classes.DataContainer,
- self.classes.Job,
- self.classes.SubJob,
- )
+ DataContainer, Job, = (self.classes.DataContainer, self.classes.Job)
Job_A = aliased(Job)
)
def test_join_wpoly(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
)
def test_join_wsubclass(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, SubJob = (
self.classes.DataContainer,
- self.classes.Job,
self.classes.SubJob,
)
)
def test_join_wpoly_innerjoin(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
)
def test_join_walias(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
- self.classes.DataContainer,
- self.classes.Job,
- self.classes.SubJob,
- )
+ DataContainer, Job, = (self.classes.DataContainer, self.classes.Job)
Job_A = aliased(Job)
)
def test_join_explicit_wpoly_noalias(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
)
def test_join_explicit_wpoly_flat(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
)
def test_join_explicit_wpoly_full_alias(self):
- ParentThing, DataContainer, Job, SubJob = (
- self.classes.ParentThing,
+ DataContainer, Job, SubJob = (
self.classes.DataContainer,
self.classes.Job,
self.classes.SubJob,
def test_chained(self):
User = self.classes.User
Order = self.classes.Order
- Item = self.classes.Item
sess = Session()
q = sess.query(User)
opt = self._option_fixture(User.orders).joinedload("items")
def test_missing_str_attr_of_type_subclass(self):
s = Session()
- wp = with_polymorphic(Person, [Manager], flat=True)
-
assert_raises_message(
sa.exc.ArgumentError,
r'Can\'t find property named "manager_name" on '
def test_modern_opt_getstate(self):
User = self.classes.User
- sess = Session()
- q = sess.query(User)
-
opt = self._option_fixture(User.addresses)
eq_(
opt.__getstate__(),
def test_no_mappers(self):
users = self.tables.users
- umapper = mapper(User, users)
+ mapper(User, users)
u1 = User(name="ed")
u1_pickled = pickle.dumps(u1, -1)
def test_no_instrumentation(self):
users = self.tables.users
- umapper = mapper(User, users)
+ mapper(User, users)
u1 = User(name="ed")
u1_pickled = pickle.dumps(u1, -1)
clear_mappers()
- umapper = mapper(User, users)
+ mapper(User, users)
u1 = pickle.loads(u1_pickled)
# this fails unless the InstanceState
eq_(opt.path, opt2.path)
u1 = sess.query(User).options(opt).first()
- u2 = pickle.loads(pickle.dumps(u1))
+ pickle.loads(pickle.dumps(u1))
def test_collection_setstate(self):
"""test a particular cycle that requires CollectionAdapter
.first()
)
- e2 = pickle.loads(pickle.dumps(emp))
+ pickle.loads(pickle.dumps(emp))
class PolymorphicDeferredTest(fixtures.MappedTest):
) # m2o
def test_tuple_labeling(self):
- users = self.tables.users
sess = create_session()
# test pickle + all the protocols !
def test_select_with_bindparam_offset_limit_w_cast(self):
User = self.classes.User
sess = create_session()
- q1 = (
- sess.query(self.classes.User)
- .order_by(self.classes.User.id)
- .limit(bindparam("n"))
- )
eq_(
list(
sess.query(User)
User = self.classes.User
Address = self.classes.Address
- Order = self.classes.Order
sess = create_session()
)
def _test_join_aliasing(self, sess):
- Employee, Company = self.classes.Employee, self.classes.Company
+ Employee = self.classes.Employee
eq_(
[
n
mapper(Child, child)
def test_joins_fully(self):
- Parent, Child = self.classes.Parent, self.classes.Child
+ Parent = self.classes.Parent
self.assert_compile(
Parent.children.property.strategy._lazywhere,
)
def test_lazyload(self):
- Network, Address = self.classes.Network, self.classes.Address
+ Network = self.classes.Network
session = Session(testing.db)
sess.commit()
def test_render_join(self):
- A, D = self.classes.A, self.classes.D
+ A = self.classes.A
sess = Session()
self.assert_compile(
sess.query(A).join(A.d),
)
def test_render_joinedload(self):
- A, D = self.classes.A, self.classes.D
+ A = self.classes.A
sess = Session()
self.assert_compile(
sess.query(A).options(joinedload(A.d)),
def test_render_lazyload(self):
from sqlalchemy.testing.assertsql import CompiledSQL
- A, D = self.classes.A, self.classes.D
+ A = self.classes.A
sess = Session()
a1 = sess.query(A).filter(A.name == "a1").first()
eq_(self.mapping[a.name], d.name if d is not None else None)
def test_joinedload(self):
- A, D = self.classes.A, self.classes.D
+ A = self.classes.A
sess = Session()
for a in sess.query(A).options(joinedload(A.d)):
eq_(self.mapping[a.name], d.name if d is not None else None)
def test_lazyload(self):
- A, D = self.classes.A, self.classes.D
+ A = self.classes.A
sess = Session()
for a in sess.query(A):
def test_config_errors(self):
Session = scoped_session(sa.orm.sessionmaker())
- s = Session()
+ s = Session() # noqa
assert_raises_message(
sa.exc.InvalidRequestError,
"Scoped session is already present",
class BaseRelationFromJoinedSubclassTest(_Polymorphic):
@classmethod
def define_tables(cls, metadata):
- people = Table(
+ Table(
"people",
metadata,
Column(
# to test fully, PK of engineers table must be
# named differently from that of people
- engineers = Table(
+ Table(
"engineers",
metadata,
Column(
Column("primary_language", String(50)),
)
- paperwork = Table(
+ Table(
"paperwork",
metadata,
Column(
Column("person_id", Integer, ForeignKey("people.person_id")),
)
- pages = Table(
+ Table(
"pages",
metadata,
Column(
session.commit()
close_all_sessions()
- d = session.query(Director).options(subqueryload("*")).first()
+ d = session.query(Director).options(subqueryload("*")).first() # noqa
assert len(list(session)) == 3
),
)
- movies = q.all()
+ movies = q.all() # noqa
# check number of persistent objects in session
eq_(len(list(s)), 5)
self.tables.people,
)
- m1 = mapper(PersonSite, peoplesites)
+ mapper(PersonSite, peoplesites)
m2 = mapper(
Person, people, properties={"sites": relationship(PersonSite)}
)
self.classes.User,
)
- m = mapper(
+ mapper(
User,
users,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
User,
users,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
User,
users,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
User,
users,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
User,
users,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
User,
users,
properties=dict(
)
m1 = mapper(User, users)
- m2 = mapper(
+ mapper(
Address,
addresses,
properties=dict(
)
u = User(name="test")
- a = Address(email_address="testaddress", user=u)
+ Address(email_address="testaddress", user=u)
session = create_session()
session.add(u)
)
m2 = mapper(Address, addresses)
- m = mapper(
+ mapper(
User,
users,
properties={
sa.orm.clear_mappers()
- m = mapper(User, users)
+ mapper(User, users)
evt = Events()
event.listen(User, "before_insert", evt.before_insert)
event.listen(User, "after_insert", evt.after_insert)
self.classes.User,
)
- # TODO: put assertion in here !!!
- m = mapper(
+ mapper(
Address,
addresses,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
Address,
addresses,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
Address,
addresses,
properties=dict(
self.classes.User,
)
- m = mapper(
+ mapper(
Address,
addresses,
properties=dict(
mapper(Keyword, keywords)
- m = mapper(
+ mapper(
Item,
items,
properties=dict(
@classmethod
def define_tables(cls, metadata):
- t1 = Table("t1", metadata, Column("id", Integer, primary_key=True))
+ Table("t1", metadata, Column("id", Integer, primary_key=True))
- t2 = Table(
+ Table(
"t2",
metadata,
Column("id", Integer, primary_key=True),
class ExpressionUpdateTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- data = Table(
+ Table(
"data",
metadata,
Column(
class Point(object):
pass
- table = self._fixture(Point)
+ self._fixture(Point)
alias = aliased(Point)
assert_raises(TypeError, alias)
def zero(self):
self.x, self.y = 0, 0
- table = self._fixture(Point)
+ self._fixture(Point)
alias = aliased(Point)
assert Point.zero
def max_x(cls):
return 100
- table = self._fixture(Point)
+ self._fixture(Point)
alias = aliased(Point)
assert Point.max_x
def max_x(self):
return 100
- table = self._fixture(Point)
+ self._fixture(Point)
alias = aliased(Point)
assert Point.max_x
def thing(self, arg):
return arg.center
- table = self._fixture(Point)
+ self._fixture(Point)
alias = aliased(Point)
assert Point.thing != (0, 0)
while True:
conn = engine.connect()
try:
- trans = conn.begin()
+ conn.begin()
for i in range(5):
conn.execute("SELECT 1+1")
gevent.sleep(random.random() * 1.01)
def test_index_against_text_inline(self):
metadata = MetaData()
idx = Index("y", text("some_function(q)"))
- x = Table("x", metadata, Column("q", String(50)), idx)
+ Table("x", metadata, Column("q", String(50)), idx)
self.assert_compile(
schema.CreateIndex(idx), "CREATE INDEX y ON x (some_function(q))"
"The select.autocommit parameter is deprecated and "
"will be removed in a future release."
):
- stmt = select([column("x")], autocommit=True)
+ select([column("x")], autocommit=True)
def test_select_for_update(self):
with testing.expect_deprecated(
"The select.for_update parameter is deprecated and "
"will be removed in a future release."
):
- stmt = select([column("x")], for_update=True)
+ select([column("x")], for_update=True)
@testing.provide_metadata
def test_table_useexisting(self):
class CaseSensitiveFunctionDeprecationsTest(fixtures.TestBase):
-
def setup(self):
self._registry = deepcopy(functions._registry)
self._case_sensitive_registry = deepcopy(
- functions._case_sensitive_registry)
+ functions._case_sensitive_registry
+ )
functions._registry.clear()
functions._case_sensitive_registry.clear()
functions._case_sensitive_registry = self._case_sensitive_registry
def test_case_sensitive(self):
- reg = functions._registry['_default']
- cs_reg = functions._case_sensitive_registry['_default']
+ reg = functions._registry["_default"]
+ cs_reg = functions._case_sensitive_registry["_default"]
class MYFUNC(GenericFunction):
type = DateTime
not_in_("MYFUNC", reg)
not_in_("MyFunc", reg)
in_("myfunc", cs_reg)
- eq_(set(cs_reg['myfunc'].keys()), set(['MYFUNC']))
+ eq_(set(cs_reg["myfunc"].keys()), set(["MYFUNC"]))
with testing.expect_deprecated(
"GenericFunction 'MyFunc' is already registered with"
"'MYFUNC' is switched into case-sensitive mode. "
"GenericFunction objects will be fully case-insensitive in a "
"future release.",
- regex=False
+ regex=False,
):
+
class MyFunc(GenericFunction):
type = Integer
not_in_("MYFUNC", reg)
not_in_("MyFunc", reg)
in_("myfunc", cs_reg)
- eq_(set(cs_reg['myfunc'].keys()), set(['MYFUNC', 'MyFunc']))
+ eq_(set(cs_reg["myfunc"].keys()), set(["MYFUNC", "MyFunc"]))
def test_replace_function_case_sensitive(self):
- reg = functions._registry['_default']
- cs_reg = functions._case_sensitive_registry['_default']
+ reg = functions._registry["_default"]
+ cs_reg = functions._case_sensitive_registry["_default"]
class replaceable_func(GenericFunction):
type = Integer
- identifier = 'REPLACEABLE_FUNC'
+ identifier = "REPLACEABLE_FUNC"
assert isinstance(func.REPLACEABLE_FUNC().type, Integer)
assert isinstance(func.Replaceable_Func().type, Integer)
not_in_("REPLACEABLE_FUNC", reg)
not_in_("Replaceable_Func", reg)
in_("replaceable_func", cs_reg)
- eq_(set(cs_reg['replaceable_func'].keys()), set(['REPLACEABLE_FUNC']))
+ eq_(set(cs_reg["replaceable_func"].keys()), set(["REPLACEABLE_FUNC"]))
with testing.expect_deprecated(
"GenericFunction 'Replaceable_Func' is already registered with"
"'REPLACEABLE_FUNC' is switched into case-sensitive mode. "
"GenericFunction objects will be fully case-insensitive in a "
"future release.",
- regex=False
+ regex=False,
):
+
class Replaceable_Func(GenericFunction):
type = DateTime
- identifier = 'Replaceable_Func'
+ identifier = "Replaceable_Func"
assert isinstance(func.REPLACEABLE_FUNC().type, Integer)
assert isinstance(func.Replaceable_Func().type, DateTime)
not_in_("REPLACEABLE_FUNC", reg)
not_in_("Replaceable_Func", reg)
in_("replaceable_func", cs_reg)
- eq_(set(cs_reg['replaceable_func'].keys()),
- set(['REPLACEABLE_FUNC', 'Replaceable_Func']))
+ eq_(
+ set(cs_reg["replaceable_func"].keys()),
+ set(["REPLACEABLE_FUNC", "Replaceable_Func"]),
+ )
with testing.expect_warnings(
"The GenericFunction 'REPLACEABLE_FUNC' is already registered and "
"is going to be overriden.",
- regex=False
+ regex=False,
):
+
class replaceable_func_override(GenericFunction):
type = DateTime
- identifier = 'REPLACEABLE_FUNC'
+ identifier = "REPLACEABLE_FUNC"
with testing.expect_deprecated(
"GenericFunction(s) '['REPLACEABLE_FUNC', 'Replaceable_Func']' "
"are already registered with different letter cases and might "
"interact with 'replaceable_func'. GenericFunction objects will "
"be fully case-insensitive in a future release.",
- regex=False
+ regex=False,
):
+
class replaceable_func_lowercase(GenericFunction):
type = String
- identifier = 'replaceable_func'
+ identifier = "replaceable_func"
with testing.expect_warnings(
"The GenericFunction 'Replaceable_Func' is already registered and "
"is going to be overriden.",
- regex=False
+ regex=False,
):
+
class Replaceable_Func_override(GenericFunction):
type = Integer
- identifier = 'Replaceable_Func'
+ identifier = "Replaceable_Func"
assert isinstance(func.REPLACEABLE_FUNC().type, DateTime)
assert isinstance(func.Replaceable_Func().type, Integer)
not_in_("REPLACEABLE_FUNC", reg)
not_in_("Replaceable_Func", reg)
in_("replaceable_func", cs_reg)
- eq_(set(cs_reg['replaceable_func'].keys()),
- set(['REPLACEABLE_FUNC', 'Replaceable_Func', 'replaceable_func']))
+ eq_(
+ set(cs_reg["replaceable_func"].keys()),
+ set(["REPLACEABLE_FUNC", "Replaceable_Func", "replaceable_func"]),
+ )
class DDLListenerDeprecationsTest(fixtures.TestBase):
)
def test_append_listener(self):
- metadata, table, bind = self.metadata, self.table, self.bind
+ metadata, table = self.metadata, self.table
def fn(*a):
return None
assert "fnord" in canary
def test_deprecated_append_ddl_listener_metadata(self):
- metadata, users, engine = self.metadata, self.users, self.engine
+ metadata, engine = self.metadata, self.engine
canary = []
with testing.expect_deprecated(".* is deprecated .*"):
metadata.append_ddl_listener(
with testing.expect_deprecated(
"The text.autocommit parameter is deprecated"
):
- t = text("select id, name from user", autocommit=True)
+ text("select id, name from user", autocommit=True)
assert str(clause2) == str(t1.join(t2, t1.c.col2 == t2.c.col3))
def test_aliased_column_adapt(self):
- clause = t1.select()
+ t1.select()
aliased = t1.select().alias()
aliased2 = t1.alias()
)
def test_table_to_alias_3(self):
- t1alias = t1.alias("t1alias")
- vis = sql_util.ClauseAdapter(t1alias)
- self.assert_compile(
- select([literal_column("*")], t1.c.col1 == t2.c.col2),
- "SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2",
- )
-
- def test_table_to_alias_4(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
"WHERE t1alias.col1 = table2.col2",
)
- def test_table_to_alias_5(self):
+ def test_table_to_alias_4(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
"WHERE t1alias.col1 = table2.col2",
)
- def test_table_to_alias_6(self):
+ def test_table_to_alias_5(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
"(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)",
)
- def test_table_to_alias_7(self):
+ def test_table_to_alias_6(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
"WHERE t1alias.col1 = table2.col2)",
)
- def test_table_to_alias_8(self):
+ def test_table_to_alias_7(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
"t1alias.col2 ELSE t1alias.col1 END",
)
- def test_table_to_alias_9(self):
+ def test_table_to_alias_8(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
"t1alias.col2 ELSE t1alias.col1 END",
)
- def test_table_to_alias_10(self):
+ def test_table_to_alias_9(self):
s = select([literal_column("*")], from_obj=[t1]).alias("foo")
self.assert_compile(
s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo"
)
- def test_table_to_alias_11(self):
+ def test_table_to_alias_10(self):
s = select([literal_column("*")], from_obj=[t1]).alias("foo")
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
"SELECT foo.* FROM (SELECT * FROM table1 " "AS t1alias) AS foo",
)
- def test_table_to_alias_12(self):
+ def test_table_to_alias_11(self):
s = select([literal_column("*")], from_obj=[t1]).alias("foo")
self.assert_compile(
s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo"
)
- def test_table_to_alias_13(self):
+ def test_table_to_alias_12(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
ff = vis.traverse(func.count(t1.c.col1).label("foo"))
# .col1).l abel('foo')]), clone=True), "SELECT
# count(t1alias.col1) AS foo FROM table1 AS t1alias")
- def test_table_to_alias_14(self):
+ def test_table_to_alias_13(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
t2alias = t2.alias("t2alias")
"t2alias.col2",
)
- def test_table_to_alias_15(self):
+ def test_table_to_alias_14(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
t2alias = t2.alias("t2alias")
"t2alias.col2",
)
- def test_table_to_alias_16(self):
+ def test_table_to_alias_15(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
t2alias = t2.alias("t2alias")
"WHERE t1alias.col1 = t2alias.col2)",
)
- def test_table_to_alias_17(self):
+ def test_table_to_alias_16(self):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
t2alias = t2.alias("t2alias")
"WHERE c.bid = anon_1.b_aid",
)
- t1 = table("table1", column("col1"), column("col2"), column("col3"))
- t2 = table("table2", column("col1"), column("col2"), column("col3"))
-
def test_label_anonymize_one(self):
t1a = t1.alias()
adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True)
a = Table("a", meta, Column("a", Integer))
a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"]))
- b = Table("b", meta, Column("b", Integer))
+ Table("b", meta, Column("b", Integer))
assert_raises_message(
exc.NoReferencedColumnError,
def test_sequence_attach_to_table(self):
m1 = MetaData()
s1 = Sequence("s")
- t = Table("a", m1, Column("x", Integer, s1))
+ Table("a", m1, Column("x", Integer, s1))
assert s1.metadata is m1
def test_sequence_attach_to_existing_table(self):
type_ = self.WrapBoolean()
y = Column("y", type_)
y_copy = y.copy()
- t1 = Table("x", m, y_copy)
+ Table("x", m, y_copy)
is_true(y_copy.type._create_events)
def _no_error(self, col):
m = MetaData()
- b = Table("bar", m, Column("id", Integer))
+ Table("bar", m, Column("id", Integer))
t = Table("t", m, col)
schema.CreateTable(t).compile()
s1c1 = s1._clone()
s1c2 = s1._clone()
s2c1 = s2._clone()
- s2c2 = s2._clone()
s3c1 = s3._clone()
eq_(
def test_scalar_cloned_comparator(self):
sel = select([table1.c.col1]).as_scalar()
- expr = sel == table1.c.col1
+ sel == table1.c.col1
sel2 = visitors.ReplacingCloningVisitor().traverse(sel)
c1 = Column("c1", Integer)
c2 = Column("c2", Integer)
- s = select([c1])
+ select([c1])
t = Table("t", MetaData(), c1, c2)
c1 = Column("c1", Integer)
c2 = Column("c2", Integer)
- s = select([c1]).where(c1 == 5)
+ select([c1]).where(c1 == 5)
t = Table("t", MetaData(), c1, c2)
t1 = Table("t1", m, Column("x", Integer))
c1 = Column("c1", Integer)
- s = select([c1]).where(c1 == 5).select_from(t1)
+ select([c1]).where(c1 == 5).select_from(t1)
t2 = Table("t2", MetaData(), c1)
assert c1.label(None) is not c1
eq_(str(select([c1])), "SELECT count(:count_2) AS count_1")
- c2 = select([c1]).compile()
+ select([c1]).compile()
eq_(str(select([c1.label(None)])), "SELECT count(:count_2) AS count_1")
pass
col = MyColumn("x", Integer)
- binary_1 = col == 5
+ col == 5
col_anno = MyColumn("x", Integer)._annotate({"foo": "bar"})
binary_2 = col_anno == 5
eq_(binary_2.left._annotations, {"foo": "bar"})
c = column("a")
c1 = c._annotate({"foo": "bar"})
- comp1 = c1.comparator
+ c1.comparator
c2 = c1._annotate({"bat": "hoho"})
- comp2 = c2.comparator
+ c2.comparator
assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"}
def test_select_table_alias_column(self):
t = self._fixture()
- x, y = t.c.x, t.c.y
+ x = t.c.x
ta = t.alias()
s = select([ta.c.x, ta.c.y])
def test_select_label_alt_name_table_alias_column(self):
t = self._fixture()
- x, y = t.c.x, t.c.y
+ x = t.c.x
ta = t.alias()
l1, l2 = ta.c.x.label("a"), ta.c.y.label("b")
def test_select_table_alias_column(self):
t = self._xy_table_fixture()
- x, y = t.c.x, t.c.y
+ x = t.c.x
ta = t.alias()
s = text("select ta.x, ta.y FROM t AS ta").columns(ta.c.x, ta.c.y)
def test_select_label_alt_name_table_alias_column(self):
t = self._xy_table_fixture()
- x, y = t.c.x, t.c.y
+ x = t.c.x
ta = t.alias()
l1, l2 = ta.c.x.label("a"), ta.c.y.label("b")