from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import select
-from sqlalchemy import Sequence
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
Column(
"employee_id",
Integer,
- Sequence("employee_id_seq", optional=True),
+ autoincrement=False,
primary_key=True,
),
Column("name", String(50)),
employees_table = cls.tables.employees
connection.execute(
employees_table.insert(),
- [{"name": n, "department": d} for n, d in data],
+ [
+ {"employee_id": i, "name": n, "department": d}
+ for i, (n, d) in enumerate(data)
+ ],
)
def test_basic(self, connection):
def test_round_trip(self, connection):
unicode_table = self.tables.unicode_table
- connection.execute(unicode_table.insert(), {"unicode_data": self.data})
+ connection.execute(
+ unicode_table.insert(), {"id": 1, "unicode_data": self.data}
+ )
row = connection.execute(select(unicode_table.c.unicode_data)).first()
connection.execute(
unicode_table.insert(),
- [{"unicode_data": self.data} for i in range(3)],
+ [{"id": i, "unicode_data": self.data} for i in range(1, 4)],
)
rows = connection.execute(
select(unicode_table.c.unicode_data)
).fetchall()
- eq_(rows, [(self.data,) for i in range(3)])
+ eq_(rows, [(self.data,) for i in range(1, 4)])
for row in rows:
assert isinstance(row[0], util.text_type)
def _test_null_strings(self, connection):
unicode_table = self.tables.unicode_table
- connection.execute(unicode_table.insert(), {"unicode_data": None})
+ connection.execute(
+ unicode_table.insert(), {"id": 1, "unicode_data": None}
+ )
row = connection.execute(select(unicode_table.c.unicode_data)).first()
eq_(row, (None,))
def _test_empty_strings(self, connection):
unicode_table = self.tables.unicode_table
- connection.execute(unicode_table.insert(), {"unicode_data": u("")})
+ connection.execute(
+ unicode_table.insert(), {"id": 1, "unicode_data": u("")}
+ )
row = connection.execute(select(unicode_table.c.unicode_data)).first()
eq_(row, (u(""),))
def test_text_roundtrip(self, connection):
text_table = self.tables.text_table
- connection.execute(text_table.insert(), {"text_data": "some text"})
+ connection.execute(
+ text_table.insert(), {"id": 1, "text_data": "some text"}
+ )
row = connection.execute(select(text_table.c.text_data)).first()
eq_(row, ("some text",))
def test_text_empty_strings(self, connection):
text_table = self.tables.text_table
- connection.execute(text_table.insert(), {"text_data": ""})
+ connection.execute(text_table.insert(), {"id": 1, "text_data": ""})
row = connection.execute(select(text_table.c.text_data)).first()
eq_(row, ("",))
def test_text_null_strings(self, connection):
text_table = self.tables.text_table
- connection.execute(text_table.insert(), {"text_data": None})
+ connection.execute(text_table.insert(), {"id": 1, "text_data": None})
row = connection.execute(select(text_table.c.text_data)).first()
eq_(row, (None,))
def test_round_trip(self, connection):
date_table = self.tables.date_table
- connection.execute(date_table.insert(), {"date_data": self.data})
+ connection.execute(
+ date_table.insert(), {"id": 1, "date_data": self.data}
+ )
row = connection.execute(select(date_table.c.date_data)).first()
date_table = self.tables.date_table
connection.execute(
- date_table.insert(), {"decorated_date_data": self.data}
+ date_table.insert(), {"id": 1, "decorated_date_data": self.data}
)
row = connection.execute(
def test_null(self, connection):
date_table = self.tables.date_table
- connection.execute(date_table.insert(), {"date_data": None})
+ connection.execute(date_table.insert(), {"id": 1, "date_data": None})
row = connection.execute(select(date_table.c.date_data)).first()
eq_(row, (None,))
date_table = self.tables.date_table
with config.db.begin() as conn:
result = conn.execute(
- date_table.insert(), {"date_data": self.data}
+ date_table.insert(), {"id": 1, "date_data": self.data}
)
id_ = result.inserted_primary_key[0]
stmt = select(date_table.c.id).where(
metadata.create_all(config.db)
- connection.execute(int_table.insert(), {"integer_data": data})
+ connection.execute(
+ int_table.insert(), {"id": 1, "integer_data": data}
+ )
row = connection.execute(select(int_table.c.integer_data)).first()
data_table = self.tables.data_table
connection.execute(
- data_table.insert(), {"name": "row1", "data": data_element}
+ data_table.insert(),
+ {"id": 1, "name": "row1", "data": data_element},
)
row = connection.execute(select(data_table.c.data)).first()