# Currently not using psycopg2.execute_values() when there's a returning clause; need to add support
# for receiving multiple return values from insert query
- # Note: self.inline is true iff there are multiple parameters sets to the query
+ # Note: self.inline is true iff there are multiple parameters sets to
+ # the query
if self.inline and not returning_clause_exists and self.dialect.psycopg2_executemany_mode is EXECUTEMANY_VALUES:
self.execute_values_insert_template = "(" + \
", ".join([c[1] for c in crud_params]) + ")"
EXECUTEMANY_VALUES = util.symbol("executemany_values")
-
class PGDialect_psycopg2(PGDialect):
driver = "psycopg2"
if util.py2k:
self.client_encoding = client_encoding
if executemany_mode:
- if executemany_mode not in (EXECUTEMANY_DEFAULT.name, EXECUTEMANY_BATCH.name, EXECUTEMANY_VALUES.name):
- raise exc.ArgumentError("Unsupported value for 'executemany_mode': %s" % executemany_mode)
+ if executemany_mode not in (
+ EXECUTEMANY_DEFAULT.name,
+ EXECUTEMANY_BATCH.name,
+ EXECUTEMANY_VALUES.name):
+ raise exc.ArgumentError(
+ "Unsupported value for 'executemany_mode': %s" % executemany_mode)
self.psycopg2_executemany_mode = util.symbol(executemany_mode)
else:
self.psycopg2_executemany_mode = None
+ if not isinstance(executemany_page_size, int):
+ raise exc.ArgumentError("Wrong type for 'executemany_page_size': %s" % type(
+ executemany_page_size).__name__)
+
+ if executemany_page_size <= 0:
+ raise exc.ArgumentError(
+ "Wrong value for 'executemany_page_size': %s" % executemany_page_size)
+
self.psycopg2_executemany_page_size = executemany_page_size
self.psycopg2_batch_mode = use_batch_mode
eq_(cparams, {"host": "somehost", "any_random_thing": "yes"})
-class ExecuteBatchInsertsTest(fixtures.TablesTest):
+class UseBatchModeInsertsTest(fixtures.TablesTest):
__only_on__ = "postgresql+psycopg2"
__backend__ = True
)
def setup(self):
- super(ExecuteBatchInsertsTest, self).setup()
- self.engine = engines.testing_engine(options={"executemany_mode": "executemany_batch"})
+ super(UseBatchModeInsertsTest, self).setup()
+ self.engine = engines.testing_engine(options={"use_batch_mode": True})
def teardown(self):
self.engine.dispose()
- super(ExecuteBatchInsertsTest, self).teardown()
+ super(UseBatchModeInsertsTest, self).teardown()
def test_insert(self):
with self.engine.connect() as conn:
)
-class ExecuteValuesInsertsTest(fixtures.TablesTest):
+class ExecutemanyBatchInsertsTest(fixtures.TablesTest):
__only_on__ = "postgresql+psycopg2"
__backend__ = True
)
def setup(self):
- super(ExecuteValuesInsertsTest, self).setup()
+ super(ExecutemanyBatchInsertsTest, self).setup()
+ self.engine = engines.testing_engine(
+ options={"executemany_mode": "executemany_batch"})
+
+ def teardown(self):
+ self.engine.dispose()
+ super(ExecutemanyBatchInsertsTest, self).teardown()
+
+ def test_insert(self):
+ with self.engine.connect() as conn:
+ conn.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ],
+ )
+
+ eq_(
+ conn.execute(select([self.tables.data])).fetchall(),
+ [(1, "x1", "y1", 5), (2, "x2", "y2", 5), (3, "x3", "y3", 5)],
+ )
+
+ def test_not_sane_rowcount(self):
+ self.engine.connect().close()
+ assert not self.engine.dialect.supports_sane_multi_rowcount
+
+ def test_update(self):
+ with self.engine.connect() as conn:
+ conn.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ],
+ )
+
+ conn.execute(
+ self.tables.data.update()
+ .where(self.tables.data.c.x == bindparam("xval"))
+ .values(y=bindparam("yval")),
+ [{"xval": "x1", "yval": "y5"}, {"xval": "x3", "yval": "y6"}],
+ )
+ eq_(
+ conn.execute(
+ select([self.tables.data]).order_by(self.tables.data.c.id)
+ ).fetchall(),
+ [(1, "x1", "y5", 5), (2, "x2", "y2", 5), (3, "x3", "y6", 5)],
+ )
+
+
+class ExecutemanyValuesInsertsTest(fixtures.TablesTest):
+ __only_on__ = "postgresql+psycopg2"
+ __backend__ = True
+
+ run_create_tables = "each"
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "data",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", String),
+ Column("y", String),
+ Column("z", Integer, server_default="5"),
+ )
+
+ def setup(self):
+ super(ExecutemanyValuesInsertsTest, self).setup()
self.engine = engines.testing_engine(
options={"executemany_mode": "executemany_values"})
def teardown(self):
self.engine.dispose()
- super(ExecuteValuesInsertsTest, self).teardown()
+ super(ExecutemanyValuesInsertsTest, self).teardown()
def test_insert(self):
with self.engine.connect() as conn:
def test_execute_values_arguments_page_size(self):
self.engine = engines.testing_engine(
- options={"executemany_mode": "executemany_values", "executemany_page_size": 5000})
+ options={
+ "executemany_mode": "executemany_values",
+ "executemany_page_size": 5000})
def execute_values(cur, sql, argslist, template=None, page_size=100):
assert sql == "INSERT INTO data (x, y) VALUES %s"
)
+class ExecutemanyFlagOptionsTest(fixtures.TablesTest):
+ __only_on__ = "postgresql+psycopg2"
+ __backend__ = True
+
+ def test_executemany_correct_flag_options(self):
+ for opt in [
+ None,
+ "executemany_default",
+ "executemany_batch",
+ "executemany_values"]:
+ self.engine = engines.testing_engine(
+ options={"executemany_mode": opt})
+
+ def test_executemany_wrong_flag_options(self):
+ for opt in [1, True, "batch_insert"]:
+ assert_raises_message(
+ exc.ArgumentError,
+ "Unsupported value for 'executemany_mode': %s" % opt,
+ engines.testing_engine,
+ options={"executemany_mode": opt}
+ )
+
+ def test_executemany_wrong_page_size_options(self):
+ assert_raises_message(
+ exc.ArgumentError,
+ "Wrong type for 'executemany_page_size': str",
+ engines.testing_engine,
+ options={"executemany_page_size": "100"}
+ )
+ for opt in [0, -1]:
+ assert_raises_message(
+ exc.ArgumentError,
+ "Wrong value for 'executemany_page_size': %s" % opt,
+ engines.testing_engine, options={"executemany_page_size": opt})
+
class MiscBackendTest(
fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL