for i, column in enumerate(
expression._select_iterables(returning_cols)
):
- if self.isupdate and isinstance(column.server_default, Computed):
+ if (
+ self.isupdate
+ and isinstance(column.server_default, Computed)
+ and not self.dialect._supports_update_returning_computed_cols
+ ):
util.warn(
"Computed columns don't work with Oracle UPDATE "
"statements that use RETURNING; the value of the column "
def _supports_char_length(self):
return not self._is_oracle_8
+ @property
+ def _supports_update_returning_computed_cols(self):
+ # on version 18 this error is no longet present while it happens on 11
+ # it may work also on versions before the 18
+ return self.server_version_info and self.server_version_info >= (18,)
+
def do_release_savepoint(self, connection, name):
# Oracle does not support RELEASE SAVEPOINT
pass
from ...testing.provision import follower_url_from_main
from ...testing.provision import log
from ...testing.provision import run_reap_dbs
+from ...testing.provision import stop_test_class
from ...testing.provision import temp_table_keyword_args
from ...testing.provision import update_db_opts
pass
+@stop_test_class.for_db("oracle")
+def stop_test_class(config, db, cls):
+ """run magic command to get rid of identity sequences
+
+ # https://floo.bar/2019/11/29/drop-the-underlying-sequence-of-an-identity-column/
+
+ """
+
+ with db.begin() as conn:
+ conn.execute("purge recyclebin")
+
+
@run_reap_dbs.for_db("oracle")
def _reap_oracle_dbs(url, idents):
log.info("db reaper connecting to %r", url)
exclusions = None
warnings = None
profiling = None
+provision = None
assertions = None
requirements = None
config = None
fn(options, file_config)
# late imports, has to happen after config.
- global util, fixtures, engines, exclusions, assertions
+ global util, fixtures, engines, exclusions, assertions, provision
global warnings, profiling, config, testing
from sqlalchemy import testing # noqa
from sqlalchemy.testing import fixtures, engines, exclusions # noqa
from sqlalchemy.testing import assertions, warnings, profiling # noqa
- from sqlalchemy.testing import config # noqa
+ from sqlalchemy.testing import config, provision # noqa
from sqlalchemy import util # noqa
warnings.setup_filters()
def stop_test_class(cls):
# from sqlalchemy import inspect
# assert not inspect(testing.db).get_table_names()
+
+ provision.stop_test_class(config, config.db, cls)
engines.testing_reaper._stop_test_ctx()
try:
if not options.low_connections:
raise NotImplementedError(
"no temp table keyword args routine for cfg: %s" % eng.url
)
+
+
+@register.init
+def stop_test_class(config, db, testcls):
+ pass
warnings.filterwarnings(
"ignore",
category=sa_exc.SAWarning,
- message=r"Oracle compatibility version .* is known to have a "
- "maximum identifier",
+ message=r"Oracle version .* is known to have a " "maximum identifier",
)
# some selected deprecations...
eq_(conn.scalar(select([test.c.bar])), 47)
- def test_computed_update_warning(self):
+ def test_computed_update_warning(self, connection):
test = self.tables.test
- with testing.db.connect() as conn:
- conn.execute(test.insert(), {"id": 1, "foo": 5})
+ conn = connection
+ conn.execute(test.insert(), {"id": 1, "foo": 5})
+ if testing.db.dialect._supports_update_returning_computed_cols:
+ result = conn.execute(
+ test.update().values(foo=10).return_defaults()
+ )
+ eq_(result.returned_defaults, (52,))
+ else:
with testing.expect_warnings(
"Computed columns don't work with Oracle UPDATE"
):
# returns the *old* value
eq_(result.returned_defaults, (47,))
- eq_(conn.scalar(select([test.c.bar])), 52)
+ eq_(conn.scalar(select([test.c.bar])), 52)
def test_computed_update_no_warning(self):
test = self.tables.test_no_returning
@testing.provide_metadata
def test_table_round_trip(self):
- oracle.RESERVED_WORDS.remove("UNION")
+ oracle.RESERVED_WORDS.discard("UNION")
metadata = self.metadata
table = Table(
def all_tables_compression_missing():
- try:
- testing.db.execute("SELECT compression FROM all_tables")
- if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"
- ):
+ with testing.db.connect() as conn:
+ if (
+ "Enterprise Edition"
+ not in conn.execute("select * from v$version").scalar()
+ # this works in Oracle Database 18c Express Edition Release
+ ) and testing.db.dialect.server_version_info < (18,):
return True
return False
- except Exception:
- return True
def all_tables_compress_for_missing():
- try:
- testing.db.execute("SELECT compress_for FROM all_tables")
- if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"
+ with testing.db.connect() as conn:
+ if (
+ "Enterprise Edition"
+ not in conn.execute("select * from v$version").scalar()
):
return True
return False
- except Exception:
- return True
class TableReflectionTest(fixtures.TestBase):