from .elements import GroupedElement
from .elements import Grouping
from .elements import literal_column
+from .elements import Tuple
from .elements import UnaryExpression
from .visitors import InternalTraversal
from .. import exc
self.of = None
+class Values(Generative, FromClause):
+ """represent a ``VALUES`` construct that can be used as a FROM element
+ in a statement.
+
+ The :class:`.Values` object is created from the
+ :func:`~.sql.expression.values` function.
+
+ .. versionadded:: 1.4
+
+ """
+
+ named_with_column = True
+ __visit_name__ = "values"
+
+ _data = ()
+
+ _traverse_internals = [
+ ("_column_args", InternalTraversal.dp_clauseelement_list,),
+ ("_data", InternalTraversal.dp_clauseelement_list),
+ ("name", InternalTraversal.dp_string),
+ ("literal_binds", InternalTraversal.dp_boolean),
+ ]
+
+ def __init__(self, *columns, **kw):
+ r"""Construct a :class:`.Values` construct.
+
+ The column expressions and the actual data for
+ :class:`.Values` are given in two separate steps. The
+ constructor receives the column expressions typically as
+ :func:`.column` constructs, and the data is then passed via the
+ :meth:`.Values.data` method as a list, which can be called multiple
+ times to add more data, e.g.::
+
+ from sqlalchemy import column
+ from sqlalchemy import values
+
+ value_expr = values(
+ column('id', Integer),
+ column('name', Integer),
+ name="my_values"
+ ).data(
+ [(1, 'name1'), (2, 'name2'), (3, 'name3')]
+ )
+
+ :param \*columns: column expressions, typically composed using
+ :func:`.column` objects.
+
+ :param name: the name for this VALUES construct. If omitted, the
+ VALUES construct will be unnamed in a SQL expression. Different
+ backends may have different requirements here.
+
+ :param literal_binds: Defaults to False. Whether or not to render
+ the data values inline in the SQL output, rather than using bound
+ parameters.
+
+ """
+
+ super(Values, self).__init__()
+ self._column_args = columns
+ self.name = kw.pop("name", None)
+ self.literal_binds = kw.pop("literal_binds", False)
+ self.named_with_column = self.name is not None
+
+ @_generative
+ def alias(self, name, **kw):
+ """Return a new :class:`.Values` construct that is a copy of this
+ one with the given name.
+
+ This method is a VALUES-specific specialization of the
+ :class:`.FromClause.alias` method.
+
+ .. seealso::
+
+ :ref:`core_tutorial_aliases`
+
+ :func:`~.expression.alias`
+
+ """
+ self.name = name
+ self.named_with_column = self.name is not None
+
+ @_generative
+ def lateral(self, name=None):
+ """Return a new :class:`.Values` with the lateral flag set, so that
+ it renders as LATERAL.
+
+ .. seealso::
+
+ :func:`~.expression.lateral`
+
+ """
+ self._is_lateral = True
+ if name is not None:
+ self.name = name
+
+ @_generative
+ def data(self, values):
+ """Return a new :class:`.Values` construct, adding the given data
+ to the data list.
+
+ E.g.::
+
+ my_values = my_values.data([(1, 'value 1'), (2, 'value2')])
+
+ :param values: a sequence (i.e. list) of tuples that map to the
+ column expressions given in the :class:`.Values` constructor.
+
+ """
+
+ self._data += tuple(Tuple(*row).self_group() for row in values)
+
+ def _populate_column_collection(self):
+ for c in self._column_args:
+ self._columns.add(c)
+ c.table = self
+
+ @property
+ def _from_objects(self):
+ return [self]
+
+
class SelectBase(
roles.SelectStatementRole,
roles.DMLSelectRole,
from sqlalchemy import union
from sqlalchemy import union_all
from sqlalchemy import util
+from sqlalchemy import values
from sqlalchemy.dialects import mysql
from sqlalchemy.dialects import postgresql
from sqlalchemy.schema import Sequence
table("a", column("q"), column("y", Integer)),
),
lambda: (table_a, table_b),
+ lambda: (
+ values(
+ column("mykey", Integer),
+ column("mytext", String),
+ column("myint", Integer),
+ name="myvalues",
+ ).data([(1, "textA", 99), (2, "textB", 88)]),
+ values(
+ column("mykey", Integer),
+ column("mytext", String),
+ column("myint", Integer),
+ name="myothervalues",
+ ).data([(1, "textA", 99), (2, "textB", 88)]),
+ values(
+ column("mykey", Integer),
+ column("mytext", String),
+ column("myint", Integer),
+ name="myvalues",
+ ).data([(1, "textA", 89), (2, "textG", 88)]),
+ values(
+ column("mykey", Integer),
+ column("mynottext", String),
+ column("myint", Integer),
+ name="myvalues",
+ ).data([(1, "textA", 99), (2, "textB", 88)]),
+ # TODO: difference in type
+ # values(
+ # [
+ # column("mykey", Integer),
+ # column("mytext", Text),
+ # column("myint", Integer),
+ # ],
+ # (1, "textA", 99),
+ # (2, "textB", 88),
+ # alias_name="myvalues",
+ # ),
+ ),
]
dont_compare_values_fixtures = [
--- /dev/null
+from sqlalchemy import alias
+from sqlalchemy import Column
+from sqlalchemy import column
+from sqlalchemy import ForeignKey
+from sqlalchemy import Integer
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy import testing
+from sqlalchemy import true
+from sqlalchemy.engine import default
+from sqlalchemy.sql import select
+from sqlalchemy.sql import Values
+from sqlalchemy.sql.compiler import FROM_LINTING
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import fixtures
+
+
+class ValuesTest(fixtures.TablesTest, AssertsCompiledSQL):
+ __dialect__ = default.DefaultDialect(supports_native_boolean=True)
+
+ run_setup_bind = None
+
+ run_create_tables = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "people",
+ metadata,
+ Column("people_id", Integer, primary_key=True),
+ Column("age", Integer),
+ Column("name", String(30)),
+ )
+ Table(
+ "bookcases",
+ metadata,
+ Column("bookcase_id", Integer, primary_key=True),
+ Column(
+ "bookcase_owner_id", Integer, ForeignKey("people.people_id")
+ ),
+ Column("bookcase_shelves", Integer),
+ Column("bookcase_width", Integer),
+ )
+ Table(
+ "books",
+ metadata,
+ Column("book_id", Integer, primary_key=True),
+ Column(
+ "bookcase_id", Integer, ForeignKey("bookcases.bookcase_id")
+ ),
+ Column("book_owner_id", Integer, ForeignKey("people.people_id")),
+ Column("book_weight", Integer),
+ )
+
+ def test_column_quoting(self):
+ v1 = Values(
+ column("CaseSensitive", Integer),
+ column("has spaces", String),
+ name="Spaces and Cases",
+ ).data([(1, "textA", 99), (2, "textB", 88)])
+ self.assert_compile(
+ select([v1]),
+ 'SELECT "Spaces and Cases"."CaseSensitive", '
+ '"Spaces and Cases"."has spaces" FROM '
+ "(VALUES (:param_1, :param_2, :param_3), "
+ "(:param_4, :param_5, :param_6)) "
+ 'AS "Spaces and Cases" ("CaseSensitive", "has spaces")',
+ )
+
+ @testing.fixture
+ def literal_parameter_fixture(self):
+ def go(literal_binds):
+ return Values(
+ column("mykey", Integer),
+ column("mytext", String),
+ column("myint", Integer),
+ name="myvalues",
+ literal_binds=literal_binds,
+ ).data([(1, "textA", 99), (2, "textB", 88)])
+
+ return go
+
+ def test_bound_parameters(self, literal_parameter_fixture):
+ literal_parameter_fixture = literal_parameter_fixture(False)
+
+ stmt = select([literal_parameter_fixture])
+
+ self.assert_compile(
+ stmt,
+ "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM "
+ "(VALUES (:param_1, :param_2, :param_3), "
+ "(:param_4, :param_5, :param_6)"
+ ") AS myvalues (mykey, mytext, myint)",
+ checkparams={
+ "param_1": 1,
+ "param_2": "textA",
+ "param_3": 99,
+ "param_4": 2,
+ "param_5": "textB",
+ "param_6": 88,
+ },
+ )
+
+ def test_literal_parameters(self, literal_parameter_fixture):
+ literal_parameter_fixture = literal_parameter_fixture(True)
+
+ stmt = select([literal_parameter_fixture])
+
+ self.assert_compile(
+ stmt,
+ "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM "
+ "(VALUES (1, 'textA', 99), (2, 'textB', 88)"
+ ") AS myvalues (mykey, mytext, myint)",
+ checkparams={},
+ )
+
+ def test_with_join_unnamed(self):
+ people = self.tables.people
+ values = Values(
+ column("column1", Integer), column("column2", Integer),
+ ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
+ stmt = select([people, values]).select_from(
+ people.join(values, values.c.column2 == people.c.people_id)
+ )
+ self.assert_compile(
+ stmt,
+ "SELECT people.people_id, people.age, people.name, column1, "
+ "column2 FROM people JOIN (VALUES (:param_1, :param_2), "
+ "(:param_3, :param_4), (:param_5, :param_6), "
+ "(:param_7, :param_8)) "
+ "ON people.people_id = column2",
+ checkparams={
+ "param_1": 1,
+ "param_2": 1,
+ "param_3": 2,
+ "param_4": 1,
+ "param_5": 3,
+ "param_6": 2,
+ "param_7": 3,
+ "param_8": 3,
+ },
+ )
+
+ def test_with_join_named(self):
+ people = self.tables.people
+ values = Values(
+ column("bookcase_id", Integer),
+ column("bookcase_owner_id", Integer),
+ name="bookcases",
+ ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
+ stmt = select([people, values]).select_from(
+ people.join(
+ values, values.c.bookcase_owner_id == people.c.people_id
+ )
+ )
+ self.assert_compile(
+ stmt,
+ "SELECT people.people_id, people.age, people.name, "
+ "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
+ "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), "
+ "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
+ "(bookcase_id, bookcase_owner_id) "
+ "ON people.people_id = bookcases.bookcase_owner_id",
+ checkparams={
+ "param_1": 1,
+ "param_2": 1,
+ "param_3": 2,
+ "param_4": 1,
+ "param_5": 3,
+ "param_6": 2,
+ "param_7": 3,
+ "param_8": 3,
+ },
+ )
+
+ def test_with_aliased_join(self):
+ people = self.tables.people
+ values = (
+ Values(
+ column("bookcase_id", Integer),
+ column("bookcase_owner_id", Integer),
+ )
+ .data([(1, 1), (2, 1), (3, 2), (3, 3)])
+ .alias("bookcases")
+ )
+ stmt = select([people, values]).select_from(
+ people.join(
+ values, values.c.bookcase_owner_id == people.c.people_id
+ )
+ )
+ self.assert_compile(
+ stmt,
+ "SELECT people.people_id, people.age, people.name, "
+ "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
+ "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), "
+ "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
+ "(bookcase_id, bookcase_owner_id) "
+ "ON people.people_id = bookcases.bookcase_owner_id",
+ checkparams={
+ "param_1": 1,
+ "param_2": 1,
+ "param_3": 2,
+ "param_4": 1,
+ "param_5": 3,
+ "param_6": 2,
+ "param_7": 3,
+ "param_8": 3,
+ },
+ )
+
+ def test_with_standalone_aliased_join(self):
+ people = self.tables.people
+ values = Values(
+ column("bookcase_id", Integer),
+ column("bookcase_owner_id", Integer),
+ ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
+ values = alias(values, "bookcases")
+
+ stmt = select([people, values]).select_from(
+ people.join(
+ values, values.c.bookcase_owner_id == people.c.people_id
+ )
+ )
+ self.assert_compile(
+ stmt,
+ "SELECT people.people_id, people.age, people.name, "
+ "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
+ "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), "
+ "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
+ "(bookcase_id, bookcase_owner_id) "
+ "ON people.people_id = bookcases.bookcase_owner_id",
+ checkparams={
+ "param_1": 1,
+ "param_2": 1,
+ "param_3": 2,
+ "param_4": 1,
+ "param_5": 3,
+ "param_6": 2,
+ "param_7": 3,
+ "param_8": 3,
+ },
+ )
+
+ def test_lateral(self):
+ people = self.tables.people
+ values = (
+ Values(
+ column("bookcase_id", Integer),
+ column("bookcase_owner_id", Integer),
+ name="bookcases",
+ )
+ .data([(1, 1), (2, 1), (3, 2), (3, 3)])
+ .lateral()
+ )
+ stmt = select([people, values]).select_from(
+ people.join(values, true())
+ )
+ self.assert_compile(
+ stmt,
+ "SELECT people.people_id, people.age, people.name, "
+ "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
+ "JOIN LATERAL (VALUES (:param_1, :param_2), (:param_3, :param_4), "
+ "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
+ "(bookcase_id, bookcase_owner_id) "
+ "ON true",
+ checkparams={
+ "param_1": 1,
+ "param_2": 1,
+ "param_3": 2,
+ "param_4": 1,
+ "param_5": 3,
+ "param_6": 2,
+ "param_7": 3,
+ "param_8": 3,
+ },
+ )
+
+ def test_from_linting_named(self):
+ people = self.tables.people
+ values = Values(
+ column("bookcase_id", Integer),
+ column("bookcase_owner_id", Integer),
+ name="bookcases",
+ ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
+ stmt = select([people, values])
+
+ with testing.expect_warnings(
+ r"SELECT statement has a cartesian product between FROM "
+ r'element\(s\) "(?:bookcases|people)" and '
+ r'FROM element "(?:people|bookcases)"'
+ ):
+ stmt.compile(linting=FROM_LINTING)
+
+ def test_from_linting_unnamed(self):
+ people = self.tables.people
+ values = Values(
+ column("bookcase_id", Integer),
+ column("bookcase_owner_id", Integer),
+ ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
+ stmt = select([people, values])
+
+ with testing.expect_warnings(
+ r"SELECT statement has a cartesian product between FROM "
+ r'element\(s\) "(?:\(unnamed VALUES element\)|people)" and '
+ r'FROM element "(?:people|\(unnamed VALUES element\))"'
+ ):
+ stmt.compile(linting=FROM_LINTING)