sql.text(
"SELECT username FROM user_db_links " "WHERE db_link=:link"
),
- link=dblink,
+ dict(link=dblink),
)
dblink = "@" + dblink
elif not owner:
"SELECT sequence_name FROM all_sequences "
"WHERE sequence_owner = :schema_name"
),
- schema_name=self.denormalize_name(schema),
+ dict(schema_name=self.denormalize_name(schema)),
)
return [self.normalize_name(row[0]) for row in cursor]
"WHERE n.nspname = :schema AND c.relkind IN (%s)"
% (", ".join("'%s'" % elem for elem in kinds))
).columns(relname=sqltypes.Unicode),
- schema=schema if schema is not None else self.default_schema_name,
+ dict(
+ schema=schema
+ if schema is not None
+ else self.default_schema_name
+ ),
)
return [name for name, in result]
"WHERE n.nspname = :schema AND c.relname = :view_name "
"AND c.relkind IN ('v', 'm')"
).columns(view_def=sqltypes.Unicode),
- schema=schema if schema is not None else self.default_schema_name,
- view_name=view_name,
+ dict(
+ schema=schema
+ if schema is not None
+ else self.default_schema_name,
+ view_name=view_name,
+ ),
)
return view_def
"where usename=current_user and pid != pg_backend_pid() "
"and datname=:dname"
),
- dname=ident,
+ dict(dname=ident),
)
conn.exec_driver_sql("DROP DATABASE %s" % ident)
upon a fixed set of classes.
"""
+
with mapperlib._CONFIGURE_MUTEX:
while _mapper_registry:
try:
def after_test():
-
if _fixture_sessions:
-
_close_all_sessions()
def test_autoincrement_on_insert(self, connection):
- connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+ connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
self._assert_round_trip(self.tables.autoinc_pk, connection)
def test_last_inserted_id(self, connection):
r = connection.execute(
- self.tables.autoinc_pk.insert(), data="some data"
+ self.tables.autoinc_pk.insert(), dict(data="some data")
)
pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
eq_(r.inserted_primary_key, (pk,))
@requirements.dbapi_lastrowid
def test_native_lastrowid_autoinc(self, connection):
r = connection.execute(
- self.tables.autoinc_pk.insert(), data="some data"
+ self.tables.autoinc_pk.insert(), dict(data="some data")
)
lastrowid = r.lastrowid
pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
engine = config.db
with engine.begin() as conn:
- r = conn.execute(self.tables.autoinc_pk.insert(), data="some data")
+ r = conn.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
assert r._soft_closed
assert not r.closed
assert r.is_insert
@requirements.returning
def test_autoclose_on_insert_implicit_returning(self, connection):
r = connection.execute(
- self.tables.autoinc_pk.insert(), data="some data"
+ self.tables.autoinc_pk.insert(), dict(data="some data")
)
assert r._soft_closed
assert not r.closed
def test_explicit_returning_pk_autocommit(self, connection):
table = self.tables.autoinc_pk
r = connection.execute(
- table.insert().returning(table.c.id), data="some data"
+ table.insert().returning(table.c.id), dict(data="some data")
)
pk = r.first()[0]
fetched_pk = connection.scalar(select(table.c.id))
def test_explicit_returning_pk_no_autocommit(self, connection):
table = self.tables.autoinc_pk
r = connection.execute(
- table.insert().returning(table.c.id), data="some data"
+ table.insert().returning(table.c.id), dict(data="some data")
)
pk = r.first()[0]
fetched_pk = connection.scalar(select(table.c.id))
def test_autoincrement_on_insert_implicit_returning(self, connection):
- connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+ connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
self._assert_round_trip(self.tables.autoinc_pk, connection)
def test_last_inserted_id_implicit_returning(self, connection):
r = connection.execute(
- self.tables.autoinc_pk.insert(), data="some data"
+ self.tables.autoinc_pk.insert(), dict(data="some data")
)
pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
eq_(r.inserted_primary_key, (pk,))
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
- conn.execute(stmt, q=10)
+ conn.execute(stmt, dict(q=10))
asserter.assert_(
CursorSQL(
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
- conn.execute(stmt, q=[5, 6, 7])
+ conn.execute(stmt, dict(q=[5, 6, 7]))
asserter.assert_(
CursorSQL(
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
- conn.execute(stmt, q=[(5, 10), (12, 18)])
+ conn.execute(stmt, dict(q=[(5, 10), (12, 18)]))
asserter.assert_(
CursorSQL(
with self.sql_execution_asserter() as asserter:
with config.db.connect() as conn:
- conn.execute(stmt, q=[(5, "z1"), (12, "z3")])
+ conn.execute(stmt, dict(q=[(5, "z1"), (12, "z3")]))
asserter.assert_(
CursorSQL(
)
def test_insert_roundtrip(self, connection):
- connection.execute(self.tables.seq_pk.insert(), data="some data")
+ connection.execute(self.tables.seq_pk.insert(), dict(data="some data"))
self._assert_round_trip(self.tables.seq_pk, connection)
def test_insert_lastrowid(self, connection):
- r = connection.execute(self.tables.seq_pk.insert(), data="some data")
+ r = connection.execute(
+ self.tables.seq_pk.insert(), dict(data="some data")
+ )
eq_(
r.inserted_primary_key, (testing.db.dialect.default_sequence_base,)
)
@requirements.sequences_optional
def test_optional_seq(self, connection):
r = connection.execute(
- self.tables.seq_opt_pk.insert(), data="some data"
+ self.tables.seq_opt_pk.insert(), dict(data="some data")
)
eq_(r.inserted_primary_key, (1,))
def test_update(self, connection):
t = self.tables.plain_pk
- r = connection.execute(t.update().where(t.c.id == 2), data="d2_new")
+ r = connection.execute(
+ t.update().where(t.c.id == 2), dict(data="d2_new")
+ )
assert not r.is_insert
assert not r.returns_rows
# Core execution
#
r"The (?:Executable|Engine)\.(?:execute|scalar)\(\) method",
- r"The connection.execute\(\) method in SQLAlchemy 2.0 will accept "
- "parameters as a single dictionary or a single sequence of "
- "dictionaries only.",
r"The Connection.connect\(\) method is considered legacy",
# r".*DefaultGenerator.execute\(\)",
#
def test_insert_plain_param(self, connection):
conn = connection
cattable = self.tables.cattable
- conn.execute(cattable.insert(), id=5)
+ conn.execute(cattable.insert(), dict(id=5))
eq_(conn.scalar(select(cattable.c.id)), 5)
def test_insert_values_key_plain(self, connection):
__only_on__ = "mssql"
__backend__ = True
- @testing.provide_metadata
- def test_fetchid_trigger(self, connection):
+ def test_fetchid_trigger(self, metadata, connection):
# TODO: investigate test hang on mssql when connection fixture is used
"""
Verify identity return value on inserting to a trigger table.
# TODO: check whether this error also occurs with clients other
# than the SQL Server Native Client. Maybe an assert_raises
# test should be written.
- meta = self.metadata
+ meta = metadata
t1 = Table(
"t1",
meta,
# seems to work with all linux drivers + backend. not sure
# if windows drivers / servers have different behavior here.
meta.create_all(connection)
- r = connection.execute(t2.insert(), descr="hello")
+ r = connection.execute(t2.insert(), dict(descr="hello"))
eq_(r.inserted_primary_key, (200,))
- r = connection.execute(t1.insert(), descr="hello")
+ r = connection.execute(t1.insert(), dict(descr="hello"))
eq_(r.inserted_primary_key, (100,))
@testing.provide_metadata
def test_date_roundtrips(self, date_fixture, connection):
t, (d1, t1, d2) = date_fixture
connection.execute(
- t.insert(), adate=d1, adatetime=d2, atime1=t1, atime2=d2
+ t.insert(), dict(adate=d1, adatetime=d2, atime1=t1, atime2=d2)
)
row = connection.execute(t.select()).first()
sa.exc.DBAPIError,
connection.execute,
t.insert(),
- adatetimeoffset=dto_param_value,
+ dict(adatetimeoffset=dto_param_value),
)
return
connection.execute(
t.insert(),
- adatetimeoffset=dto_param_value,
+ dict(adatetimeoffset=dto_param_value),
)
row = connection.execute(t.select()).first()
expected = data
with engine.begin() as conn:
- conn.execute(binary_table.insert(), data=data)
+ conn.execute(binary_table.insert(), dict(data=data))
eq_(conn.scalar(select(binary_table.c.data)), expected)
conn.execute(binary_table.delete())
- conn.execute(binary_table.insert(), data=None)
+ conn.execute(binary_table.insert(), dict(data=None))
eq_(conn.scalar(select(binary_table.c.data)), None)
eq_(
outparam("y_out", Float),
outparam("z_out", String),
),
- x_in=5,
+ dict(x_in=5),
)
eq_(result.out_parameters, {"x_out": 10, "y_out": 75, "z_out": None})
assert isinstance(result.out_parameters["x_out"], int)
connection.execute(
select(t).where(t.c.foo.in_(bindparam("uid", expanding=True))),
- uid=[1, 2, 3],
+ dict(uid=[1, 2, 3]),
)
AND owner = :owner
AND constraint_type = 'P' """
),
- table_name=s_table.name.upper(),
- owner=testing.db.dialect.default_schema_name.upper(),
+ dict(
+ table_name=s_table.name.upper(),
+ owner=testing.db.dialect.default_schema_name.upper(),
+ ),
)
reflectedtable = inspect.tables[s_table.name]
t.create(connection)
connection.execute(
t.insert(),
- dict(id=1, data=v1),
- dict(id=2, data=v2),
- dict(id=3, data=v3),
+ [
+ dict(id=1, data=v1),
+ dict(id=2, data=v2),
+ dict(id=3, data=v3),
+ ],
)
eq_(
Column("data", oracle.RAW(35)),
)
metadata.create_all(connection)
- connection.execute(raw_table.insert(), id=1, data=b("ABCDEF"))
+ connection.execute(raw_table.insert(), dict(id=1, data=b("ABCDEF")))
eq_(connection.execute(raw_table.select()).first(), (1, b("ABCDEF")))
def test_reflect_nvarchar(self, metadata, connection):
t = Table("t", metadata, Column("data", oracle.LONG))
metadata.create_all(connection)
- connection.execute(t.insert(), data="xyz")
+ connection.execute(t.insert(), dict(data="xyz"))
eq_(connection.scalar(select(t.c.data)), "xyz")
def test_longstring(self, metadata, connection):
)
try:
t = Table("z_test", metadata, autoload_with=connection)
- connection.execute(t.insert(), id=1.0, add_user="foobar")
+ connection.execute(t.insert(), dict(id=1.0, add_user="foobar"))
assert connection.execute(t.select()).fetchall() == [(1, "foobar")]
finally:
exec_sql(connection, "DROP TABLE Z_TEST")
cls.stream = stream = file_.read(12000)
for i in range(1, 11):
- connection.execute(binary_table.insert(), id=i, data=stream)
+ connection.execute(binary_table.insert(), dict(id=i, data=stream))
def test_lobs_without_convert(self):
engine = testing_engine(options=dict(auto_convert_lobs=False))
)
t = Table("speedy_users", meta, autoload_with=connection)
r = connection.execute(
- t.insert(), user_name="user", user_password="lala"
+ t.insert(), dict(user_name="user", user_password="lala")
)
eq_(r.inserted_primary_key, (1,))
result = connection.execute(t.select()).fetchall()
conn.execute(
table.insert(),
- {"id": 31, "data": "d3"},
- {"id": 32, "data": "d4"},
+ [
+ {"id": 31, "data": "d3"},
+ {"id": 32, "data": "d4"},
+ ],
)
# executemany, uses SERIAL
- conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
+ conn.execute(table.insert(), [{"data": "d5"}, {"data": "d6"}])
# single execute, explicit id, inline
eq_(r.inserted_primary_key, (5,))
conn.execute(
table.insert(),
- {"id": 31, "data": "d3"},
- {"id": 32, "data": "d4"},
+ [
+ {"id": 31, "data": "d3"},
+ {"id": 32, "data": "d4"},
+ ],
)
- conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
+ conn.execute(table.insert(), [{"data": "d5"}, {"data": "d6"}])
conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
conn.execute(table.insert().inline(), {"data": "d8"})
conn.execute(
table.insert(),
- {"id": 31, "data": "d3"},
- {"id": 32, "data": "d4"},
+ [
+ {"id": 31, "data": "d3"},
+ {"id": 32, "data": "d4"},
+ ],
)
# executemany, uses SERIAL
- conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
+ conn.execute(table.insert(), [{"data": "d5"}, {"data": "d6"}])
# single execute, explicit id, inline
eq_(r.inserted_primary_key, (5,))
conn.execute(
table.insert(),
- {"id": 31, "data": "d3"},
- {"id": 32, "data": "d4"},
+ [
+ {"id": 31, "data": "d3"},
+ {"id": 32, "data": "d4"},
+ ],
)
- conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
+ conn.execute(table.insert(), [{"data": "d5"}, {"data": "d6"}])
conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
conn.execute(table.insert().inline(), {"data": "d8"})
conn.execute(table.insert(), {"data": "d2"})
conn.execute(
table.insert(),
- {"id": 31, "data": "d3"},
- {"id": 32, "data": "d4"},
+ [
+ {"id": 31, "data": "d3"},
+ {"id": 32, "data": "d4"},
+ ],
)
- conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
+ conn.execute(table.insert(), [{"data": "d5"}, {"data": "d6"}])
conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
conn.execute(table.insert().inline(), {"data": "d8"})
conn.execute(table.insert(), {"data": "d2"})
conn.execute(
table.insert(),
- {"id": 31, "data": "d3"},
- {"id": 32, "data": "d4"},
+ [
+ {"id": 31, "data": "d3"},
+ {"id": 32, "data": "d4"},
+ ],
)
- conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
+ conn.execute(table.insert(), [{"data": "d5"}, {"data": "d6"}])
conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
conn.execute(table.insert().inline(), {"data": "d8"})
)
metadata.create_all(connection)
connection.execute(
- t1.insert(), x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")]
+ t1.insert(), dict(x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")])
)
row = connection.execute(t1.select()).first()
eq_(row, ([5], [5], [6], [decimal.Decimal("6.4")]))
)
metadata.create_all(connection)
connection.execute(
- t1.insert(), x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")]
+ t1.insert(), dict(x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")])
)
row = connection.execute(t1.select()).first()
eq_(row, ([5], [5], [6], [decimal.Decimal("6.4")]))
)
t1.create(connection)
t1.create(connection, checkfirst=True) # check the create
- connection.execute(t1.insert(), value="two")
- connection.execute(t1.insert(), value="three")
- connection.execute(t1.insert(), value="three")
+ connection.execute(t1.insert(), dict(value="two"))
+ connection.execute(t1.insert(), dict(value="three"))
+ connection.execute(t1.insert(), dict(value="three"))
eq_(
connection.execute(t1.select().order_by(t1.c.id)).fetchall(),
[(1, "two"), (2, "three"), (3, "three")],
]
t1.create(conn, checkfirst=True)
- conn.execute(t1.insert(), value="two")
- conn.execute(t1.insert(), value="three")
- conn.execute(t1.insert(), value="three")
+ conn.execute(t1.insert(), dict(value="two"))
+ conn.execute(t1.insert(), dict(value="three"))
+ conn.execute(t1.insert(), dict(value="three"))
eq_(
conn.execute(t1.select().order_by(t1.c.id)).fetchall(),
[(1, "two"), (2, "three"), (3, "three")],
),
)
metadata.create_all(connection)
- connection.execute(t1.insert(), value=util.u("drôle"))
- connection.execute(t1.insert(), value=util.u("réveillé"))
- connection.execute(t1.insert(), value=util.u("S’il"))
+ connection.execute(t1.insert(), dict(value=util.u("drôle")))
+ connection.execute(t1.insert(), dict(value=util.u("réveillé")))
+ connection.execute(t1.insert(), dict(value=util.u("S’il")))
eq_(
connection.execute(t1.select().order_by(t1.c.id)).fetchall(),
[
somedate = connection.scalar(func.current_timestamp().select())
assert somedate.tzinfo
- connection.execute(tztable.insert(), id=1, name="row1", date=somedate)
+ connection.execute(
+ tztable.insert(), dict(id=1, name="row1", date=somedate)
+ )
row = connection.execute(
select(tztable.c.date).where(tztable.c.id == 1)
).first()
)
result = connection.execute(
tztable.update(tztable.c.id == 1).returning(tztable.c.date),
- name="newname",
+ dict(
+ name="newname",
+ ),
)
row = result.first()
assert row[0] >= somedate
somedate = datetime.datetime(2005, 10, 20, 11, 52, 0)
assert not somedate.tzinfo
connection.execute(
- notztable.insert(), id=1, name="row1", date=somedate
+ notztable.insert(), dict(id=1, name="row1", date=somedate)
)
row = connection.execute(
select(notztable.c.date).where(notztable.c.id == 1)
eq_(row[0].tzinfo, None)
result = connection.execute(
notztable.update(notztable.c.id == 1).returning(notztable.c.date),
- name="newname",
+ dict(
+ name="newname",
+ ),
)
row = result.first()
assert row[0] >= somedate
)
def _fixture_456(self, table, connection):
- connection.execute(table.insert(), intarr=[4, 5, 6])
+ connection.execute(table.insert(), dict(intarr=[4, 5, 6]))
def test_reflect_array_column(self, connection):
metadata2 = MetaData()
arrtable = self.tables.arrtable
connection.execute(
arrtable.insert(),
- intarr=[1, 2, 3],
- strarr=[util.u("abc"), util.u("def")],
+ dict(
+ intarr=[1, 2, 3],
+ strarr=[util.u("abc"), util.u("def")],
+ ),
)
results = connection.execute(arrtable.select()).fetchall()
eq_(len(results), 1)
arrtable = self.tables.arrtable
connection.execute(
arrtable.insert(),
- intarr=[1, None, 3],
- strarr=[util.u("abc"), None],
+ dict(
+ intarr=[1, None, 3],
+ strarr=[util.u("abc"), None],
+ ),
)
results = connection.execute(arrtable.select()).fetchall()
eq_(len(results), 1)
arrtable = self.tables.arrtable
connection.execute(
arrtable.insert(),
- intarr=[1, 2, 3],
- strarr=[util.u("abc"), util.u("def")],
+ dict(
+ intarr=[1, 2, 3],
+ strarr=[util.u("abc"), util.u("def")],
+ ),
)
connection.execute(
- arrtable.insert(), intarr=[4, 5, 6], strarr=util.u("ABC")
+ arrtable.insert(), dict(intarr=[4, 5, 6], strarr=util.u("ABC"))
)
results = connection.execute(
arrtable.select().where(arrtable.c.intarr == [1, 2, 3])
arrtable = self.tables.arrtable
connection.execute(
arrtable.insert(),
- intarr=[1, 2, 3],
- strarr=[util.u("abc"), util.u("def")],
+ dict(intarr=[1, 2, 3], strarr=[util.u("abc"), util.u("def")]),
)
results = connection.execute(
select(arrtable.c.intarr + [4, 5, 6])
arrtable = self.tables.arrtable
connection.execute(
arrtable.insert(),
- id=5,
- intarr=[1, 2, 3],
- strarr=[util.u("abc"), util.u("def")],
+ dict(
+ id=5, intarr=[1, 2, 3], strarr=[util.u("abc"), util.u("def")]
+ ),
)
results = connection.execute(
select(arrtable.c.id).where(arrtable.c.intarr < [4, 5, 6])
arrtable = self.tables.arrtable
connection.execute(
arrtable.insert(),
- intarr=[4, 5, 6],
- strarr=[[util.ue("m\xe4\xe4")], [util.ue("m\xf6\xf6")]],
+ dict(
+ intarr=[4, 5, 6],
+ strarr=[[util.ue("m\xe4\xe4")], [util.ue("m\xf6\xf6")]],
+ ),
)
connection.execute(
arrtable.insert(),
- intarr=[1, 2, 3],
- strarr=[util.ue("m\xe4\xe4"), util.ue("m\xf6\xf6")],
+ dict(
+ intarr=[1, 2, 3],
+ strarr=[util.ue("m\xe4\xe4"), util.ue("m\xf6\xf6")],
+ ),
)
results = connection.execute(
arrtable.select(order_by=[arrtable.c.intarr])
arrtable = self.tables.arrtable
connection.execute(
arrtable.insert(),
- intarr=[4, 5, 6],
- strarr=[util.u("abc"), util.u("def")],
+ dict(
+ intarr=[4, 5, 6],
+ strarr=[util.u("abc"), util.u("def")],
+ ),
)
eq_(connection.scalar(select(arrtable.c.intarr[2:3])), [5, 6])
connection.execute(
def test_multi_dim_roundtrip(self, connection):
arrtable = self.tables.arrtable
- connection.execute(arrtable.insert(), dimarr=[[1, 2, 3], [4, 5, 6]])
+ connection.execute(
+ arrtable.insert(), dict(dimarr=[[1, 2, 3], [4, 5, 6]])
+ )
eq_(
connection.scalar(select(arrtable.c.dimarr)),
[[-1, 0, 1], [2, 3, 4]],
def test_array_any_exec(self, connection):
arrtable = self.tables.arrtable
- connection.execute(arrtable.insert(), intarr=[4, 5, 6])
+ connection.execute(arrtable.insert(), dict(intarr=[4, 5, 6]))
eq_(
connection.scalar(
select(arrtable.c.intarr).where(
def test_array_all_exec(self, connection):
arrtable = self.tables.arrtable
- connection.execute(arrtable.insert(), intarr=[4, 5, 6])
+ connection.execute(arrtable.insert(), dict(intarr=[4, 5, 6]))
eq_(
connection.scalar(
select(arrtable.c.intarr).where(
)
metadata.create_all(connection)
connection.execute(
- t1.insert(), id=1, data=["1", "2", "3"], data2=[5.4, 5.6]
+ t1.insert(), dict(id=1, data=["1", "2", "3"], data2=[5.4, 5.6])
)
connection.execute(
- t1.insert(), id=2, data=["4", "5", "6"], data2=[1.0]
+ t1.insert(), dict(id=2, data=["4", "5", "6"], data2=[1.0])
)
connection.execute(
t1.insert(),
- id=3,
- data=[["4", "5"], ["6", "7"]],
- data2=[[5.4, 5.6], [1.0, 1.1]],
+ dict(
+ id=3,
+ data=[["4", "5"], ["6", "7"]],
+ data2=[[5.4, 5.6], [1.0, 1.1]],
+ ),
)
r = connection.execute(t1.select().order_by(t1.c.id)).fetchall()
def test_array_contained_by_exec(self, connection):
arrtable = self.tables.arrtable
- connection.execute(arrtable.insert(), intarr=[6, 5, 4])
+ connection.execute(arrtable.insert(), dict(intarr=[6, 5, 4]))
eq_(
connection.scalar(
select(arrtable.c.intarr.contained_by([4, 5, 6, 7]))
def test_array_overlap_exec(self, connection):
arrtable = self.tables.arrtable
- connection.execute(arrtable.insert(), intarr=[4, 5, 6])
+ connection.execute(arrtable.insert(), dict(intarr=[4, 5, 6]))
eq_(
connection.scalar(
select(arrtable.c.intarr).where(
def test_tsvector_round_trip(self, connection, metadata):
t = Table("t1", metadata, Column("data", postgresql.TSVECTOR))
t.create(connection)
- connection.execute(t.insert(), data="a fat cat sat")
+ connection.execute(t.insert(), dict(data="a fat cat sat"))
eq_(connection.scalar(select(t.c.data)), "'a' 'cat' 'fat' 'sat'")
- connection.execute(t.update(), data="'a' 'cat' 'fat' 'mat' 'sat'")
+ connection.execute(
+ t.update(), dict(data="'a' 'cat' 'fat' 'mat' 'sat'")
+ )
eq_(
connection.scalar(select(t.c.data)),
data_table = self.tables.data_table
connection.execute(
data_table.insert(),
- {"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}},
- {"name": "r2", "data": {"k1": "r2v1", "k2": "r2v2"}},
- {"name": "r3", "data": {"k1": "r3v1", "k2": "r3v2"}},
- {"name": "r4", "data": {"k1": "r4v1", "k2": "r4v2"}},
- {"name": "r5", "data": {"k1": "r5v1", "k2": "r5v2"}},
+ [
+ {"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}},
+ {"name": "r2", "data": {"k1": "r2v1", "k2": "r2v2"}},
+ {"name": "r3", "data": {"k1": "r3v1", "k2": "r3v2"}},
+ {"name": "r4", "data": {"k1": "r4v1", "k2": "r4v2"}},
+ {"name": "r5", "data": {"k1": "r5v1", "k2": "r5v2"}},
+ ],
)
def _assert_data(self, compare, conn):
exc.StatementError,
connection.execute,
select(1).where(bindparam("date", type_=Date)),
- date=str(datetime.date(2007, 10, 30)),
+ dict(date=str(datetime.date(2007, 10, 30))),
)
def test_cant_parse_datetime_message(self, connection):
value = {"json": {"foo": "bar"}, "recs": ["one", "two"]}
- connection.execute(sqlite_json.insert(), foo=value)
+ connection.execute(sqlite_json.insert(), dict(foo=value))
eq_(connection.scalar(select(sqlite_json.c.foo)), value)
value = {"json": {"foo": "bar"}}
- connection.execute(sqlite_json.insert(), foo=value)
+ connection.execute(sqlite_json.insert(), dict(foo=value))
eq_(
connection.scalar(select(sqlite_json.c.foo["json"])), value["json"]
users = self.tables.users
connection = self.bind.connect()
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
trans2 = connection.begin_nested()
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
trans2.rollback()
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
transaction.commit()
eq_(
connection.execute(
users = self.tables.users
connection = self.bind.connect()
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
trans2 = connection.begin_nested()
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
trans2.commit()
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
transaction.commit()
eq_(
connection.execute(
engine = create_engine("sqlite:///")
cx = engine.connect()
cx.begin()
- table = self.users
ddl = DDL("SELECT 1")
- for spec in (
- (cx.execute, ddl),
- (cx.execute, ddl, table),
- ):
- fn = spec[0]
- arg = spec[1:]
- r = fn(*arg)
- eq_(list(r), [(1,)])
+ r = cx.execute(ddl)
+ eq_(list(r), [(1,)])
def test_platform_escape(self):
"""test the escaping of % characters in the DDL construct."""
import sqlalchemy as tsa
import sqlalchemy as sa
+from sqlalchemy import bindparam
from sqlalchemy import create_engine
from sqlalchemy import DDL
from sqlalchemy import engine
connection.begin()
branched = connection.connect()
assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
+ branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
"begun, creating a 'sub' transaction"
):
nested = branched.begin()
- branched.execute(users.insert(), user_id=2, user_name="user2")
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
nested.rollback()
assert not connection.in_transaction()
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
trans2 = connection.begin_nested()
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
"begun, creating a 'sub' transaction"
):
trans3 = connection.begin()
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
trans3.rollback()
assert_raises_message(
trans2.rollback()
assert connection._nested_transaction is None
- connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
transaction.commit()
eq_(
connection.execute(
connection = local_connection
users = self.tables.users
transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
"begun, creating a 'sub' transaction"
):
transaction2 = connection.begin()
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
transaction3 = connection.begin_nested()
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
"begun, creating a 'sub' transaction"
):
transaction4 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
transaction4.commit()
transaction3.rollback()
- connection.execute(users.insert(), user_id=5, user_name="user5")
+ connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
transaction2.commit()
transaction.prepare()
transaction.commit()
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
"begun, creating a 'sub' transaction"
):
trans2 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
+ connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
+ connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
assert connection.in_transaction()
trans2.close()
assert connection.in_transaction()
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
"begun, creating a 'sub' transaction"
):
trans2 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
+ connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
+ connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
assert connection.in_transaction()
trans2.close()
assert connection.in_transaction()
transaction = connection.begin()
try:
connection.execute(
- users.insert(), user_id=1, user_name="user1"
+ users.insert(), dict(user_id=1, user_name="user1")
)
connection.execute(
- users.insert(), user_id=2, user_name="user2"
+ users.insert(), dict(user_id=2, user_name="user2")
)
connection.execute(
- users.insert(), user_id=3, user_name="user3"
+ users.insert(), dict(user_id=3, user_name="user3")
)
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
trans2 = connection.begin()
try:
connection.execute(
- users.insert(), user_id=4, user_name="user4"
+ users.insert(), dict(user_id=4, user_name="user4")
)
connection.execute(
- users.insert(), user_id=5, user_name="user5"
+ users.insert(), dict(user_id=5, user_name="user5")
)
raise Exception("uh oh")
trans2.commit()
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
with testing.expect_deprecated_20(
r"Calling .begin\(\) when a transaction is already "
"begun, creating a 'sub' transaction"
):
trans2 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
+ connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
+ connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
trans2.commit()
transaction.rollback()
self.assert_(
trans = connection.begin()
branched = connection.connect()
assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
+ branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
nested = branched.begin_nested()
- branched.execute(users.insert(), user_id=2, user_name="user2")
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
nested.rollback()
assert connection.in_transaction()
trans.commit()
r"The current statement is being autocommitted using "
"implicit autocommit"
):
- branched.execute(users.insert(), user_id=1, user_name="user1")
+ branched.execute(
+ users.insert(), dict(user_id=1, user_name="user1")
+ )
nested = branched.begin_twophase()
- branched.execute(users.insert(), user_id=2, user_name="user2")
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
nested.rollback()
assert not connection.in_transaction()
eq_(
):
break
+ @testing.combinations(
+ ((), {"z": 10}, [], {"z": 10}, testing.requires.legacy_engine),
+ )
+ def test_modify_parameters_from_event_one(
+ self, multiparams, params, expected_multiparams, expected_params
+ ):
+ # this is testing both the normalization added to parameters
+ # as of I97cb4d06adfcc6b889f10d01cc7775925cffb116 as well as
+ # that the return value from the event is taken as the new set
+ # of parameters.
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
+ eq_(multiparams, expected_multiparams)
+ eq_(params, expected_params)
+ return clauseelement, (), {"q": "15"}
+
+ def after_execute(
+ conn, clauseelement, multiparams, params, result, execution_options
+ ):
+ eq_(multiparams, ())
+ eq_(params, {"q": "15"})
+
+ e1 = testing_engine(config.db_url)
+ event.listen(e1, "before_execute", before_execute, retval=True)
+ event.listen(e1, "after_execute", after_execute)
+
+ with e1.connect() as conn:
+ with testing.expect_deprecated_20(
+ r"The connection\.execute\(\) method"
+ ):
+ result = conn.execute(
+ select(bindparam("q", type_=String)),
+ *multiparams,
+ **params
+ )
+ eq_(result.all(), [("15",)])
+
def test_retval_flag(self):
canary = []
[{"x": 5, "y": 10}, {"x": 8, "y": 9}],
{},
),
- ((), {"z": 10}, [], {"z": 10}, testing.requires.legacy_engine),
(({"z": 10},), {}, [], {"z": 10}),
argnames="multiparams, params, expected_multiparams, expected_params",
)
users = self.tables.users
connection = local_connection
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
transaction.commit()
transaction = connection.begin()
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
transaction.commit()
transaction = connection.begin()
users = self.tables.users
connection = local_connection
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
transaction.rollback()
result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 0
transaction = connection.begin()
try:
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=1, user_name="user3")
+ connection.execute(
+ users.insert(), dict(user_id=1, user_name="user1")
+ )
+ connection.execute(
+ users.insert(), dict(user_id=2, user_name="user2")
+ )
+ connection.execute(
+ users.insert(), dict(user_id=1, user_name="user3")
+ )
transaction.commit()
assert False
except Exception as e:
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
conn2 = connection.execution_options(dummy=True)
- conn2.execute(users.insert(), user_id=2, user_name="user2")
+ conn2.execute(users.insert(), dict(user_id=2, user_name="user2"))
transaction.rollback()
eq_(
connection.exec_driver_sql("select count(*) from users").scalar(),
connection = local_connection
users = self.tables.users
trans = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
try:
- connection.execute(users.insert(), user_id=2, user_name="user2.5")
+ connection.execute(
+ users.insert(), dict(user_id=2, user_name="user2.5")
+ )
except Exception:
trans.__exit__(*sys.exc_info())
)
trans = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
trans.__exit__(None, None, None)
assert not trans.is_active
self.assert_(
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
assert connection.in_transaction()
transaction.commit()
assert not connection.in_transaction()
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
assert connection.in_transaction()
transaction.close()
assert not connection.in_transaction()
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
trans2 = connection.begin_nested()
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
trans2.rollback()
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
transaction.commit()
eq_(
connection.execute(
connection = local_connection
users = self.tables.users
transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
trans2 = connection.begin_nested()
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
trans2.commit()
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
transaction.commit()
eq_(
connection.execute(
connection = local_connection
users = self.tables.users
transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
transaction.prepare()
transaction.commit()
transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
transaction.commit()
transaction.close()
transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=3, user_name="user3")
+ connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
transaction.rollback()
transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
transaction.prepare()
transaction.rollback()
transaction.close()
connection = testing.db.connect()
transaction = connection.begin_twophase()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ connection.execute(
+ users.insert(), dict(dict(user_id=1, user_name="user1"))
+ )
transaction.prepare()
connection.invalidate()
conn = local_connection
users = self.tables.users
xa = conn.begin_twophase()
- conn.execute(users.insert(), user_id=1, user_name="user1")
+ conn.execute(users.insert(), dict(user_id=1, user_name="user1"))
xa.prepare()
xa.commit()
xa = conn.begin_twophase()
- conn.execute(users.insert(), user_id=2, user_name="user2")
+ conn.execute(users.insert(), dict(user_id=2, user_name="user2"))
xa.prepare()
xa.rollback()
xa = conn.begin_twophase()
- conn.execute(users.insert(), user_id=3, user_name="user3")
+ conn.execute(users.insert(), dict(user_id=3, user_name="user3"))
xa.rollback()
xa = conn.begin_twophase()
- conn.execute(users.insert(), user_id=4, user_name="user4")
+ conn.execute(users.insert(), dict(user_id=4, user_name="user4"))
xa.prepare()
xa.commit()
result = conn.execute(
rec = conn.connection._connection_record
raw_dbapi_con = rec.connection
conn.begin_twophase()
- conn.execute(users.insert(), user_id=1, user_name="user1")
+ conn.execute(users.insert(), dict(user_id=1, user_name="user1"))
assert rec.connection is raw_dbapi_con
__requires__ = ("deferrable_fks",)
__only_on__ = ("postgresql+psycopg2",) # needs #5824 for asyncpg
+ # this test has a lot of problems, am investigating asyncpg
+ # issues separately. just get this legacy use case to pass for now.
+ __only_on__ = ("postgresql+psycopg2",)
+
@classmethod
def define_tables(cls, metadata):
Table("t1", metadata, Column("id", Integer, primary_key=True))
def test_close_transaction_on_commit_fail(self):
T2 = self.classes.T2
- session = fixture_session(autocommit=True)
+ session = Session(testing.db, autocommit=True)
# with a deferred constraint, this fails at COMMIT time instead
# of at INSERT time.
connection.execute(
users.insert(),
- {"id": 1, "name": "user1"},
- {"id": 2, "name": "user2"},
- {"id": 3, "name": "user3"},
+ [
+ {"id": 1, "name": "user1"},
+ {"id": 2, "name": "user2"},
+ {"id": 3, "name": "user3"},
+ ],
)
connection.execute(
stuff.insert(),
- {"id": 1, "user_id": 1, "date": datetime.date(2007, 10, 15)},
- {"id": 2, "user_id": 1, "date": datetime.date(2007, 12, 15)},
- {"id": 3, "user_id": 1, "date": datetime.date(2007, 11, 15)},
- {"id": 4, "user_id": 2, "date": datetime.date(2008, 1, 15)},
- {"id": 5, "user_id": 3, "date": datetime.date(2007, 6, 15)},
- {"id": 6, "user_id": 3, "date": datetime.date(2007, 3, 15)},
+ [
+ {"id": 1, "user_id": 1, "date": datetime.date(2007, 10, 15)},
+ {"id": 2, "user_id": 1, "date": datetime.date(2007, 12, 15)},
+ {"id": 3, "user_id": 1, "date": datetime.date(2007, 11, 15)},
+ {"id": 4, "user_id": 2, "date": datetime.date(2008, 1, 15)},
+ {"id": 5, "user_id": 3, "date": datetime.date(2007, 6, 15)},
+ {"id": 6, "user_id": 3, "date": datetime.date(2007, 3, 15)},
+ ],
)
def test_labeled_on_date_noalias(self):
connection.execute(
people.insert(),
- {"person_id": 1, "name": "person1", "type": "person"},
- {"person_id": 2, "name": "engineer1", "type": "engineer"},
- {"person_id": 3, "name": "engineer2", "type": "engineer"},
+ [
+ {"person_id": 1, "name": "person1", "type": "person"},
+ {"person_id": 2, "name": "engineer1", "type": "engineer"},
+ {"person_id": 3, "name": "engineer2", "type": "engineer"},
+ ],
)
connection.execute(
engineers.insert(),
- {"person_id": 2, "status": "new engineer"},
- {"person_id": 3, "status": "old engineer"},
+ [
+ {"person_id": 2, "status": "new engineer"},
+ {"person_id": 3, "status": "old engineer"},
+ ],
)
@classmethod
connection.execute(
user_t.insert(),
- {"id": 1, "name": "user1"},
- {"id": 2, "name": "user2"},
- {"id": 3, "name": "user3"},
+ [
+ {"id": 1, "name": "user1"},
+ {"id": 2, "name": "user2"},
+ {"id": 3, "name": "user3"},
+ ],
)
connection.execute(
stuff.insert(),
- {"id": 1, "user_id": 1, "date": datetime.date(2007, 10, 15)},
- {"id": 2, "user_id": 1, "date": datetime.date(2007, 12, 15)},
- {"id": 3, "user_id": 1, "date": datetime.date(2007, 11, 15)},
- {"id": 4, "user_id": 2, "date": datetime.date(2008, 1, 15)},
- {"id": 5, "user_id": 3, "date": datetime.date(2007, 6, 15)},
+ [
+ {"id": 1, "user_id": 1, "date": datetime.date(2007, 10, 15)},
+ {"id": 2, "user_id": 1, "date": datetime.date(2007, 12, 15)},
+ {"id": 3, "user_id": 1, "date": datetime.date(2007, 11, 15)},
+ {"id": 4, "user_id": 2, "date": datetime.date(2008, 1, 15)},
+ {"id": 5, "user_id": 3, "date": datetime.date(2007, 6, 15)},
+ ],
)
def test_correlated_lazyload(self):
conn.execute(
person.insert(),
- {"id": 1, "city_id": 1},
- {"id": 2, "city_id": 1},
+ [
+ {"id": 1, "city_id": 1},
+ {"id": 2, "city_id": 1},
+ ],
)
conn.execute(city.insert(), {"id": 2, "deleted": True})
)
with testing.db.begin() as conn:
- conn.execute(users.insert(), username="jack", fullname="jack")
- conn.execute(addresses.insert(), email="jack1", username="jack")
- conn.execute(addresses.insert(), email="jack2", username="jack")
+ conn.execute(
+ users.insert(), dict(username="jack", fullname="jack")
+ )
+ conn.execute(
+ addresses.insert(), dict(email="jack1", username="jack")
+ )
+ conn.execute(
+ addresses.insert(), dict(email="jack2", username="jack")
+ )
mapper(User, users)
mapper(
options={"implicit_returning": returning}
)
with engine.begin() as conn:
- conn.execute(t2.insert(), nextid=1)
- r = conn.execute(t1.insert(), data="hi")
+ conn.execute(t2.insert(), dict(nextid=1))
+ r = conn.execute(t1.insert(), dict(data="hi"))
eq_((1,), r.inserted_primary_key)
- conn.execute(t2.insert(), nextid=2)
- r = conn.execute(t1.insert(), data="there")
+ conn.execute(t2.insert(), dict(nextid=2))
+ r = conn.execute(t1.insert(), dict(data="there"))
eq_((2,), r.inserted_primary_key)
r = conn.execute(date_table.insert())
aitable = self.tables.aitable
ids = set()
- rs = connection.execute(aitable.insert(), int1=1)
+ rs = connection.execute(aitable.insert(), dict(int1=1))
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = connection.execute(aitable.insert(), str1="row 2")
+ rs = connection.execute(aitable.insert(), dict(str1="row 2"))
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = connection.execute(aitable.insert(), int1=3, str1="row 3")
+ rs = connection.execute(aitable.insert(), dict(int1=3, str1="row 3"))
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
__backend__ = True
@testing.fails_on("oracle", "FIXME: unknown")
- @testing.provide_metadata
- def test_empty_insert(self, connection):
+ def test_empty_insert(self, metadata, connection):
t1 = Table(
"t1",
- self.metadata,
+ metadata,
Column("is_true", Boolean, server_default=("1")),
)
- self.metadata.create_all(connection)
+ metadata.create_all(connection)
connection.execute(t1.insert())
eq_(
1,
conn.execute(cls.tables.keyed2.insert(), dict(a="a2", b="b2"))
conn.execute(cls.tables.keyed3.insert(), dict(a="a3", d="d3"))
conn.execute(cls.tables.keyed4.insert(), dict(b="b4", q="q4"))
- conn.execute(cls.tables.content.insert(), type="t1")
+ conn.execute(cls.tables.content.insert(), dict(type="t1"))
if testing.requires.schemas.enabled:
conn.execute(
connection.execute(
users.insert(),
- dict(user_id=1, user_name="john"),
- dict(user_id=2, user_name="jack"),
+ [
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ ],
)
def test_column_accessor_textual_select(self, connection):
conn.execute(users.delete())
conn.execute(
users.insert(),
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": "fred"},
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ ],
)
for pickle in False, True:
connection.execute(t2.insert().values(value=func.length("one")))
connection.execute(
t2.insert().values(value=func.length("asfda") + -19),
- stuff="hi",
+ dict(stuff="hi"),
)
res = sorted(connection.execute(select(t2.c.value, t2.c.stuff)))
connection.execute(
t2.update().values(value=func.length("asdsafasd")),
- stuff="some stuff",
+ dict(stuff="some stuff"),
)
eq_(
connection.execute(select(t2.c.value, t2.c.stuff)).fetchall(),
# a length check on all subsequent parameters.
connection.execute(
users.insert(),
- {"user_id": 7},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9},
+ [
+ {"user_id": 7},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9},
+ ],
)
def _test_lastrow_accessor(self, table_, values, assertvalues):
is_(bool(comp.returning), True)
with engine.begin() as connection:
- result = connection.execute(table_.insert(), **values)
+ result = connection.execute(table_.insert(), values)
ret = values.copy()
for col, id_ in zip(
connection.execute(
users.insert(),
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": "fred"},
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ ],
)
concat = ("test: " + users.c.user_name).label("thedata")
users = self.tables.users
connection.execute(
users.insert(),
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": "fred"},
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ ],
)
concat = ("test: " + users.c.user_name).label("thedata")
users = self.tables.users
connection.execute(
users.insert(),
- {"user_id": 1, "user_name": "apples"},
- {"user_id": 2, "user_name": "oranges"},
- {"user_id": 3, "user_name": "bananas"},
- {"user_id": 4, "user_name": "legumes"},
- {"user_id": 5, "user_name": "hi % there"},
+ [
+ {"user_id": 1, "user_name": "apples"},
+ {"user_id": 2, "user_name": "oranges"},
+ {"user_id": 3, "user_name": "bananas"},
+ {"user_id": 4, "user_name": "legumes"},
+ {"user_id": 5, "user_name": "hi % there"},
+ ],
)
for expr, result in (
users = self.tables.users
connection.execute(
users.insert(),
- {"user_id": 1, "user_name": "one"},
- {"user_id": 2, "user_name": "TwO"},
- {"user_id": 3, "user_name": "ONE"},
- {"user_id": 4, "user_name": "OnE"},
+ [
+ {"user_id": 1, "user_name": "one"},
+ {"user_id": 2, "user_name": "TwO"},
+ {"user_id": 3, "user_name": "ONE"},
+ {"user_id": 4, "user_name": "OnE"},
+ ],
)
eq_(
def test_compiled_execute(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
s = (
select(users)
.where(users.c.user_id == bindparam("id"))
.compile(connection)
)
- eq_(connection.execute(s, id=7).first()._mapping["user_id"], 7)
+ eq_(connection.execute(s, dict(id=7)).first()._mapping["user_id"], 7)
def test_compiled_insert_execute(self, connection):
users = self.tables.users
connection.execute(
- users.insert().compile(connection), user_id=7, user_name="jack"
+ users.insert().compile(connection),
+ dict(user_id=7, user_name="jack"),
)
s = (
select(users)
.where(users.c.user_id == bindparam("id"))
.compile(connection)
)
- eq_(connection.execute(s, id=7).first()._mapping["user_id"], 7)
+ eq_(connection.execute(s, dict(id=7)).first()._mapping["user_id"], 7)
def test_repeated_bindparams(self, connection):
"""Tests that a BindParam can be used more than once.
"""
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="jack")
- connection.execute(users.insert(), user_id=8, user_name="fred")
+ connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
+ connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
u = bindparam("userid")
s = users.select(and_(users.c.user_name == u, users.c.user_name == u))
- r = connection.execute(s, userid="fred").fetchall()
+ r = connection.execute(s, dict(userid="fred")).fetchall()
assert len(r) == 1
def test_bindparam_detection(self):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="c")
- connection.execute(users.insert(), user_id=2, user_name="b")
- connection.execute(users.insert(), user_id=3, user_name="a")
+ connection.execute(users.insert(), dict(user_id=1, user_name="c"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="b"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="a"))
def a_eq(executable, wanted):
got = list(connection.execute(executable))
users = self.tables.users
- connection.execute(users.insert(), user_id=1)
- connection.execute(users.insert(), user_id=2, user_name="b")
- connection.execute(users.insert(), user_id=3, user_name="a")
+ connection.execute(users.insert(), dict(user_id=1))
+ connection.execute(users.insert(), dict(user_id=2, user_name="b"))
+ connection.execute(users.insert(), dict(user_id=3, user_name="a"))
def a_eq(executable, wanted):
got = list(connection.execute(executable))
"""test the behavior of the in_() function."""
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="jack")
- connection.execute(users.insert(), user_id=8, user_name="fred")
- connection.execute(users.insert(), user_id=9, user_name=None)
+ connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
+ connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
+ connection.execute(users.insert(), dict(user_id=9, user_name=None))
s = users.select(users.c.user_name.in_([]))
r = connection.execute(s).fetchall()
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="jack")
- connection.execute(users.insert(), user_id=8, user_name="fred")
- connection.execute(users.insert(), user_id=9, user_name=None)
+ connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
+ connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
+ connection.execute(users.insert(), dict(user_id=9, user_name=None))
u = bindparam("search_key", type_=String)
s = users.select(not_(u.in_([])))
- r = connection.execute(s, search_key="john").fetchall()
+ r = connection.execute(s, dict(search_key="john")).fetchall()
assert len(r) == 3
- r = connection.execute(s, search_key=None).fetchall()
+ r = connection.execute(s, dict(search_key=None)).fetchall()
assert len(r) == 3
def test_literal_in(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="jack")
- connection.execute(users.insert(), user_id=8, user_name="fred")
- connection.execute(users.insert(), user_id=9, user_name=None)
+ connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
+ connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
+ connection.execute(users.insert(), dict(user_id=9, user_name=None))
s = users.select(not_(literal("john").in_([])))
r = connection.execute(s).fetchall()
def _assert_raises(self, stmt, params):
with testing.db.connect() as conn:
- assert_raises_message(
- exc.StatementError,
- "A value is required for bind parameter 'x'",
- conn.execute,
- stmt,
- **params
- )
-
assert_raises_message(
exc.StatementError,
"A value is required for bind parameter 'x'",
def insert_data(cls, connection):
users, addresses = cls.tables("users", "addresses")
conn = connection
- conn.execute(users.insert(), user_id=1, user_name="john")
+ conn.execute(users.insert(), dict(user_id=1, user_name="john"))
conn.execute(
- addresses.insert(), address_id=1, user_id=1, address="addr1"
+ addresses.insert(), dict(address_id=1, user_id=1, address="addr1")
)
- conn.execute(users.insert(), user_id=2, user_name="jack")
+ conn.execute(users.insert(), dict(user_id=2, user_name="jack"))
conn.execute(
- addresses.insert(), address_id=2, user_id=2, address="addr1"
+ addresses.insert(), dict(address_id=2, user_id=2, address="addr1")
)
- conn.execute(users.insert(), user_id=3, user_name="ed")
+ conn.execute(users.insert(), dict(user_id=3, user_name="ed"))
conn.execute(
- addresses.insert(), address_id=3, user_id=3, address="addr2"
+ addresses.insert(), dict(address_id=3, user_id=3, address="addr2")
)
- conn.execute(users.insert(), user_id=4, user_name="wendy")
+ conn.execute(users.insert(), dict(user_id=4, user_name="wendy"))
conn.execute(
- addresses.insert(), address_id=4, user_id=4, address="addr3"
+ addresses.insert(), dict(address_id=4, user_id=4, address="addr3")
)
- conn.execute(users.insert(), user_id=5, user_name="laura")
+ conn.execute(users.insert(), dict(user_id=5, user_name="laura"))
conn.execute(
- addresses.insert(), address_id=5, user_id=5, address="addr4"
+ addresses.insert(), dict(address_id=5, user_id=5, address="addr4")
)
- conn.execute(users.insert(), user_id=6, user_name="ralph")
+ conn.execute(users.insert(), dict(user_id=6, user_name="ralph"))
conn.execute(
- addresses.insert(), address_id=6, user_id=6, address="addr5"
+ addresses.insert(), dict(address_id=6, user_id=6, address="addr5")
)
- conn.execute(users.insert(), user_id=7, user_name="fido")
+ conn.execute(users.insert(), dict(user_id=7, user_name="fido"))
conn.execute(
- addresses.insert(), address_id=7, user_id=7, address="addr5"
+ addresses.insert(), dict(address_id=7, user_id=7, address="addr5")
)
def test_select_limit(self, connection):
conn.execute(
t1.insert(),
- {"t1_id": 10, "name": "t1 #10"},
- {"t1_id": 11, "name": "t1 #11"},
- {"t1_id": 12, "name": "t1 #12"},
+ [
+ {"t1_id": 10, "name": "t1 #10"},
+ {"t1_id": 11, "name": "t1 #11"},
+ {"t1_id": 12, "name": "t1 #12"},
+ ],
)
conn.execute(
t2.insert(),
- {"t2_id": 20, "t1_id": 10, "name": "t2 #20"},
- {"t2_id": 21, "t1_id": 11, "name": "t2 #21"},
+ [
+ {"t2_id": 20, "t1_id": 10, "name": "t2 #20"},
+ {"t2_id": 21, "t1_id": 11, "name": "t2 #21"},
+ ],
+ )
+ conn.execute(
+ t3.insert(), [{"t3_id": 30, "t2_id": 20, "name": "t3 #30"}]
)
- conn.execute(t3.insert(), {"t3_id": 30, "t2_id": 20, "name": "t3 #30"})
def assertRows(self, statement, expected):
"""Execute a statement and assert that rows returned equal expected."""
connection.execute(
users.insert(),
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": "fred"},
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ ],
)
r = connection.execute(users.select())
rows = []
connection.execute(
users.insert(),
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": "fred"},
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ ],
)
r = connection.execute(users.select())
rows = []
connection.execute(
users.insert(),
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": "fred"},
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ ],
)
sel = (
def test_row_comparison(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), dict(user_id=7, user_name="jack"))
rp = connection.execute(users.select()).first()
eq_(rp, rp)
connection.execute(
users.insert(),
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": "fred"},
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ ],
)
for pickle in False, True:
users = self.tables.users
addresses = self.tables.addresses
- connection.execute(users.insert(), user_id=1, user_name="john")
- connection.execute(users.insert(), user_id=2, user_name="jack")
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="jack"))
connection.execute(
- addresses.insert(), address_id=1, user_id=2, address="foo@bar.com"
+ addresses.insert(),
+ dict(address_id=1, user_id=2, address="foo@bar.com"),
)
r = connection.execute(text("select * from addresses")).first()
users = self.tables.users
addresses = self.tables.addresses
- connection.execute(users.insert(), user_id=1, user_name="john")
- connection.execute(users.insert(), user_id=2, user_name="jack")
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=2, user_name="jack"))
connection.execute(
- addresses.insert(), address_id=1, user_id=2, address="foo@bar.com"
+ addresses.insert(),
+ dict(address_id=1, user_id=2, address="foo@bar.com"),
)
r = connection.execute(text("select * from addresses"))
connection.execute(
users.insert(),
- dict(user_id=1, user_name="john"),
- dict(user_id=2, user_name="jack"),
+ [
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ ],
)
r = connection.execute(users.select(users.c.user_id == 2)).first()
connection.execute(
users.insert(),
- dict(user_id=1, user_name="john"),
- dict(user_id=2, user_name="jack"),
+ [
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ ],
)
r = connection.execute(users.select(users.c.user_id == 2)).first()
connection.execute(
users.insert(),
- dict(user_id=1, user_name="john"),
- dict(user_id=2, user_name="jack"),
+ [
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ ],
)
r = connection.execute(
connection.execute(
users.insert(),
- dict(user_id=1, user_name="john"),
- dict(user_id=2, user_name="jack"),
+ [
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ ],
)
r = connection.execute(
text("select * from users where user_id=2")
connection.execute(
users.insert(),
- dict(user_id=1, user_name="john"),
- dict(user_id=2, user_name="jack"),
+ [
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ ],
)
r = connection.execute(
text("select * from users where user_id=2").columns(
connection.execute(
users.insert(),
- dict(user_id=1, user_name="john"),
- dict(user_id=2, user_name="jack"),
+ [
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ ],
)
# this will create column() objects inside
# the select(), these need to match on name anyway
lambda r: r.cursor_strategy.fetchone(r, r.cursor),
]:
trans = conn.begin()
- result = conn.execute(users.insert(), user_id=1)
+ result = conn.execute(users.insert(), dict(user_id=1))
assert_raises_message(
exc.ResourceClosedError,
"This result object does not return rows. "
def test_row_as_args(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="john")
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
r = connection.execute(users.select(users.c.user_id == 1)).first()
connection.execute(users.delete())
connection.execute(users.insert(), r._mapping)
connection.execute(users2.delete())
r = connection.execute(users.select())
- connection.execute(users2.insert(), *[row._mapping for row in r])
+ connection.execute(users2.insert(), [row._mapping for row in r])
eq_(
connection.execute(
users2.select().order_by(users2.c.user_id)
users = self.tables.users
addresses = self.tables.addresses
- connection.execute(users.insert(), user_id=1, user_name="john")
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
result = connection.execute(users.outerjoin(addresses).select())
r = result.first()
def test_ambiguous_column_by_col(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="john")
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
ua = users.alias()
u2 = users.alias()
result = connection.execute(
# ticket 2702. in 0.7 we'd get True, False.
# in 0.8, both columns are present so it's True;
# but when they're fetched you'll get the ambiguous error.
- connection.execute(users.insert(), user_id=1, user_name="john")
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
result = connection.execute(
select(users.c.user_id, addresses.c.user_id).select_from(
users.outerjoin(addresses)
def test_ambiguous_column_by_col_plus_label(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="john")
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
result = connection.execute(
select(
users.c.user_id,
def test_fetch_partial_result_map(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="ed")
+ connection.execute(users.insert(), dict(user_id=7, user_name="ed"))
t = text("select * from users").columns(user_name=String())
eq_(connection.execute(t).fetchall(), [(7, "ed")])
def test_fetch_unordered_result_map(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="ed")
+ connection.execute(users.insert(), dict(user_id=7, user_name="ed"))
class Goofy1(TypeDecorator):
impl = String
def test_column_label_targeting(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=7, user_name="ed")
+ connection.execute(users.insert(), dict(user_id=7, user_name="ed"))
for s in (
users.select().alias("foo"),
def test_ro_mapping_py3k(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
result = connection.execute(users.select())
row = result.first()
def test_ro_mapping_py2k(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
result = connection.execute(users.select())
row = result.first()
users = self.tables.users
addresses = self.tables.addresses
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
result = connection.execute(users.select())
obj = get_object(result)
def test_row_mapping_keys(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
result = connection.execute(users.select())
eq_(result.keys(), ["user_id", "user_name"])
row = result.first()
def test_row_keys_legacy_dont_warn(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
result = connection.execute(users.select())
row = result.first()
# DO NOT WARN DEPRECATED IN 1.x, ONLY 2.0 WARNING
def test_row_namedtuple_legacy_ok(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
result = connection.execute(users.select())
row = result.first()
eq_(row.user_id, 1)
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
result = connection.execute(
select(
users.c.user_id,
def test_items(self, connection):
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
r = connection.execute(users.select()).first()
eq_(
[(x[0].lower(), x[1]) for x in list(r._mapping.items())],
connection.execute(
users.insert(),
- dict(user_id=1, user_name="foo"),
- dict(user_id=2, user_name="bar"),
- dict(user_id=3, user_name="def"),
+ [
+ dict(user_id=1, user_name="foo"),
+ dict(user_id=2, user_name="bar"),
+ dict(user_id=3, user_name="def"),
+ ],
)
rows = connection.execute(
# should return values in column definition order
users = self.tables.users
- connection.execute(users.insert(), user_id=1, user_name="foo")
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
r = connection.execute(users.select(users.c.user_id == 1)).first()
eq_(r[0], 1)
eq_(r[1], "foo")
self.metadata.create_all(connection)
connection.execute(
shadowed.insert(),
- shadow_id=1,
- shadow_name="The Shadow",
- parent="The Light",
- row="Without light there is no shadow",
- _parent="Hidden parent",
- _row="Hidden row",
+ dict(
+ shadow_id=1,
+ shadow_name="The Shadow",
+ parent="The Light",
+ row="Without light there is no shadow",
+ _parent="Hidden parent",
+ _row="Hidden row",
+ ),
)
r = connection.execute(
shadowed.select(shadowed.c.shadow_id == 1)
with engine.begin() as conn:
mock_rowcount.__get__ = Mock()
conn.execute(
- t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
+ t.insert(),
+ [{"data": "d1"}, {"data": "d2"}, {"data": "d3"}],
)
eq_(len(mock_rowcount.__get__.mock_calls), 0)
def test_insert(self, connection):
table = self.tables.tables
result = connection.execute(
- table.insert().returning(table.c.foo_id), data="somedata"
+ table.insert().returning(table.c.foo_id), dict(data="somedata")
)
row = result.first()._mapping
assert row[table.c.foo_id] == row["id"] == 1
def test_round_trip(self, connection):
connection.execute(
self.tables.test_table.insert(),
- {"x": "X1", "y": "Y1"},
- {"x": "X2", "y": "Y2"},
- {"x": "X3", "y": "Y3"},
+ [
+ {"x": "X1", "y": "Y1"},
+ {"x": "X2", "y": "Y2"},
+ {"x": "X3", "y": "Y3"},
+ ],
)
# test insert coercion alone
t = Table("t", self.metadata, Column("x", variant))
t.create(connection)
- connection.execute(t.insert(), x="foo")
+ connection.execute(t.insert(), dict(x="foo"))
eq_(connection.scalar(select(t.c.x).where(t.c.x == "foo")), "fooUTWO")
t.create(connection)
connection.execute(
- t.insert(), x=datetime.datetime(2015, 4, 18, 10, 15, 17, 4839)
+ t.insert(),
+ dict(x=datetime.datetime(2015, 4, 18, 10, 15, 17, 4839)),
)
eq_(
stream2 = self.load_stream("binary_data_two.dat")
connection.execute(
binary_table.insert(),
- primary_id=1,
- misc="binary_data_one.dat",
- data=stream1,
- data_slice=stream1[0:100],
- pickled=testobj1,
- mypickle=testobj3,
+ dict(
+ primary_id=1,
+ misc="binary_data_one.dat",
+ data=stream1,
+ data_slice=stream1[0:100],
+ pickled=testobj1,
+ mypickle=testobj3,
+ ),
)
connection.execute(
binary_table.insert(),
- primary_id=2,
- misc="binary_data_two.dat",
- data=stream2,
- data_slice=stream2[0:99],
- pickled=testobj2,
+ dict(
+ primary_id=2,
+ misc="binary_data_two.dat",
+ data=stream2,
+ data_slice=stream2[0:99],
+ pickled=testobj2,
+ ),
)
connection.execute(
binary_table.insert(),
- primary_id=3,
- misc="binary_data_two.dat",
- data=None,
- data_slice=stream2[0:99],
- pickled=None,
+ dict(
+ primary_id=3,
+ misc="binary_data_two.dat",
+ data=None,
+ data_slice=stream2[0:99],
+ pickled=None,
+ ),
)
for stmt in (
assert isinstance(expr.right.type, LargeBinary)
data = os.urandom(32)
- connection.execute(binary_table.insert(), data=data)
+ connection.execute(binary_table.insert(), dict(data=data))
eq_(
connection.scalar(
select(func.count("*"))
def _fixture(self, connection, metadata, type_, data):
t = Table("t", metadata, Column("val", type_))
metadata.create_all(connection)
- connection.execute(t.insert(), val=data)
+ connection.execute(t.insert(), dict(val=data))
@testing.fails_on("sqlite", "Doesn't provide Decimal results natively")
@testing.provide_metadata
delta = datetime.timedelta(14)
connection.execute(
interval_table.insert(),
- native_interval=small_delta,
- native_interval_args=delta,
- non_native_interval=delta,
+ dict(
+ native_interval=small_delta,
+ native_interval_args=delta,
+ non_native_interval=delta,
+ ),
)
row = connection.execute(interval_table.select()).first()
eq_(row.native_interval, small_delta)
connection.execute(
interval_table.insert(),
- id=1,
- native_inverval=None,
- non_native_interval=None,
+ dict(
+ id=1,
+ native_inverval=None,
+ non_native_interval=None,
+ ),
)
row = connection.execute(interval_table.select()).first()
eq_(row.native_interval, None)