# coding: utf-8
+import weakref
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
config, is_, is_not_, le_, expect_warnings
import re
from sqlalchemy.testing.util import picklers
+from sqlalchemy.testing.util import gc_collect
from sqlalchemy.interfaces import ConnectionProxy
from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \
- bindparam, select, event, TypeDecorator, create_engine, Sequence
+ bindparam, select, event, TypeDecorator, create_engine, Sequence, \
+ LargeBinary
from sqlalchemy.sql import column, literal
from sqlalchemy.testing.schema import Table, Column
import sqlalchemy as tsa
users = Table(
'users', metadata,
Column('user_id', INT, primary_key=True, autoincrement=False),
- Column('user_name', VARCHAR(20)),
+ Column('user_name', VARCHAR(20))
)
users_autoinc = Table(
'users_autoinc', metadata,
assert len(cache) == 1
eq_(conn.execute("select count(*) from users").scalar(), 3)
+ @testing.only_on(["sqlite", "mysql", "postgresql"],
+ "uses blob value that is problematic for some DBAPIs")
+ @testing.provide_metadata
+ def test_cache_noleak_on_statement_values(self):
+ # This is a non regression test for an object reference leak caused
+ # by the compiled_cache.
+
+ metadata = self.metadata
+ photo = Table(
+ 'photo', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('photo_blob', LargeBinary()),
+ )
+ metadata.create_all()
+
+ conn = testing.db.connect()
+ cache = {}
+ cached_conn = conn.execution_options(compiled_cache=cache)
+
+ class PhotoBlob(bytearray):
+ pass
+
+ blob = PhotoBlob(100)
+ ref_blob = weakref.ref(blob)
+
+ ins = photo.insert()
+ with patch.object(
+ ins, "compile",
+ Mock(side_effect=ins.compile)) as compile_mock:
+ cached_conn.execute(ins, {'photo_blob': blob})
+ eq_(compile_mock.call_count, 1)
+ eq_(len(cache), 1)
+ eq_(conn.execute("select count(*) from photo").scalar(), 1)
+
+ del blob
+
+ gc_collect()
+
+ # The compiled statement cache should not hold any reference to the
+ # the statement values (only the keys).
+ eq_(ref_blob(), None)
+
def test_keys_independent_of_ordering(self):
conn = testing.db.connect()
conn.execute(