From: Mike Bayer Date: Tue, 17 Dec 2013 19:03:20 +0000 (-0500) Subject: Merge branch 'issue_2581' of github.com:nathan-rice/sqlalchemy into pg_json X-Git-Tag: rel_0_9_0~26^2~6 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=2c3afb4dcb6bedd5189ec7e5e25afaa4636be4c9;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Merge branch 'issue_2581' of github.com:nathan-rice/sqlalchemy into pg_json --- 2c3afb4dcb6bedd5189ec7e5e25afaa4636be4c9 diff --cc lib/sqlalchemy/dialects/postgresql/__init__.py index 3c1d19504d,00bbc7268d..728f1629fb --- a/lib/sqlalchemy/dialects/postgresql/__init__.py +++ b/lib/sqlalchemy/dialects/postgresql/__init__.py @@@ -11,10 -11,10 +11,11 @@@ base.dialect = psycopg2.dialec from .base import \ INTEGER, BIGINT, SMALLINT, VARCHAR, CHAR, TEXT, NUMERIC, FLOAT, REAL, \ INET, CIDR, UUID, BIT, MACADDR, DOUBLE_PRECISION, TIMESTAMP, TIME, \ - DATE, BYTEA, BOOLEAN, INTERVAL, ARRAY, ENUM, dialect, array, Any, All + DATE, BYTEA, BOOLEAN, INTERVAL, ARRAY, ENUM, dialect, array, Any, All, \ + TSVECTOR from .constraints import ExcludeConstraint from .hstore import HSTORE, hstore + from .pgjson import JSON from .ranges import INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, \ TSTZRANGE diff --cc lib/sqlalchemy/dialects/postgresql/psycopg2.py index c3c749523c,0dbdfe8fbd..4a9248e5ff --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@@ -324,7 -347,9 +337,8 @@@ class PGDialect_psycopg2(PGDialect) sqltypes.Numeric: _PGNumeric, ENUM: _PGEnum, # needs force_unicode sqltypes.Enum: _PGEnum, # needs force_unicode - ARRAY: _PGArray, # needs force_unicode HSTORE: _PGHStore, + JSON: _PGJSON } ) diff --cc test/dialect/postgresql/test_types.py index 6e8609448b,c7a973e4e0..19df131fd3 --- a/test/dialect/postgresql/test_types.py +++ b/test/dialect/postgresql/test_types.py @@@ -1663,3 -1652,191 +1664,191 @@@ class DateTimeTZRangeTests(_RangeTypeMi def _data_obj(self): return self.extras.DateTimeTZRange(*self.tstzs()) + + + class JSONTest(fixtures.TestBase): + def _assert_sql(self, construct, expected): + dialect = postgresql.dialect() + compiled = str(construct.compile(dialect=dialect)) + compiled = re.sub(r'\s+', ' ', compiled) + expected = re.sub(r'\s+', ' ', expected) + eq_(compiled, expected) + + def setup(self): + metadata = MetaData() + self.test_table = Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('test_column', JSON) + ) + self.jsoncol = self.test_table.c.test_column + + def _test_where(self, whereclause, expected): + stmt = select([self.test_table]).where(whereclause) + self._assert_sql( + stmt, + "SELECT test_table.id, test_table.test_column FROM test_table " + "WHERE %s" % expected + ) + + def _test_cols(self, colclause, expected, from_=True): + stmt = select([colclause]) + self._assert_sql( + stmt, + ( + "SELECT %s" + + (" FROM test_table" if from_ else "") + ) % expected + ) + + def test_bind_serialize_default(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.test_column.type._cached_bind_processor(dialect) + eq_( + proc({"A": [1, 2, 3, True, False]}), + '{"A": [1, 2, 3, true, false]}' + ) + + def test_result_deserialize_default(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc('{"A": [1, 2, 3, true, false]}'), + {"A": [1, 2, 3, True, False]} + ) + + # This test is a bit misleading -- in real life you will need to cast to do anything + def test_where_getitem(self): + self._test_where( + self.jsoncol['bar'] == None, + "(test_table.test_column -> %(test_column_1)s) IS NULL" + ) + + def test_where_path(self): + self._test_where( + self.jsoncol.get_path('{"foo", 1}') == None, + "(test_table.test_column #> %(test_column_1)s) IS NULL" + ) + + def test_where_getitem_as_text(self): + self._test_where( + self.jsoncol.get_item_as_text('bar') == None, + "(test_table.test_column ->> %(test_column_1)s) IS NULL" + ) + + def test_where_path_as_text(self): + self._test_where( + self.jsoncol.get_path_as_text('{"foo", 1}') == None, + "(test_table.test_column #>> %(test_column_1)s) IS NULL" + ) + + def test_cols_get(self): + self._test_cols( + self.jsoncol['foo'], + "test_table.test_column -> %(test_column_1)s AS anon_1", + True + ) + + + class JSONRoundTripTest(fixtures.TablesTest): + __only_on__ = 'postgresql' + + @classmethod + def define_tables(cls, metadata): + Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', JSON) + ) + + def _fixture_data(self, engine): + data_table = self.tables.data_table + engine.execute( + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}}, + ) + + def _assert_data(self, compare): + data = testing.db.execute( + select([self.tables.data_table.c.data]). + order_by(self.tables.data_table.c.name) + ).fetchall() + eq_([d for d, in data], compare) + + def _test_insert(self, engine): + engine.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}} + ) + self._assert_data([{"k1": "r1v1", "k2": "r1v2"}]) + + def _non_native_engine(self): + if testing.against("postgresql+psycopg2"): + engine = engines.testing_engine() + else: + engine = testing.db + engine.connect() + return engine + + def test_reflect(self): + from sqlalchemy import inspect + insp = inspect(testing.db) + cols = insp.get_columns('data_table') + assert isinstance(cols[2]['type'], JSON) + + @testing.only_on("postgresql+psycopg2") + def test_insert_native(self): + engine = testing.db + self._test_insert(engine) + + def test_insert_python(self): + engine = self._non_native_engine() + self._test_insert(engine) + + @testing.only_on("postgresql+psycopg2") + def test_criterion_native(self): + engine = testing.db + self._fixture_data(engine) + self._test_criterion(engine) + + def test_criterion_python(self): + engine = self._non_native_engine() + self._fixture_data(engine) + self._test_criterion(engine) + + def test_path_query(self): + engine = testing.db + self._fixture_data(engine) + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data]).where( + data_table.c.data.get_path_as_text('{k1}') == 'r3v1' + ) + ).first() + eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) + + def test_query_returned_as_text(self): + engine = testing.db + self._fixture_data(engine) + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data.get_item_as_text('k1')]) + ).first() + assert isinstance(result[0], basestring) + + def _test_criterion(self, engine): + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data]).where( + data_table.c.data.get_item_as_text('k1') == 'r3v1' + ) + ).first() - eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) ++ eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},))