load_options = active_options = orm_context.load_options
update_options = None
- else:
+ elif orm_context.is_update or orm_context.is_delete:
load_options = None
update_options = active_options = orm_context.update_delete_options
+ else:
+ load_options = update_options = active_options = None
session = orm_context.session
if orm_context.is_select:
load_options += {"_refresh_identity_token": shard_id}
execution_options["_sa_orm_load_options"] = load_options
- else:
+ elif orm_context.is_update or orm_context.is_delete:
update_options += {"_refresh_identity_token": shard_id}
execution_options["_sa_orm_update_options"] = update_options
bind_arguments=bind_arguments, execution_options=execution_options
)
- if active_options._refresh_identity_token is not None:
+ if active_options and active_options._refresh_identity_token is not None:
shard_id = active_options._refresh_identity_token
elif "_sa_shard_id" in orm_context.execution_options:
shard_id = orm_context.execution_options["_sa_shard_id"]
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
+from sqlalchemy import text
from sqlalchemy import update
from sqlalchemy import util
from sqlalchemy.ext.horizontal_shard import ShardedSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy.sql import operators
+from sqlalchemy.sql import Select
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
for value in binary.right.value:
ids.append(shard_lookup[value])
- if query.whereclause is not None:
+ if isinstance(query, Select) and query.whereclause is not None:
FindContinent().traverse(query.whereclause)
if len(ids) == 0:
return ["north_america", "asia", "europe", "south_america"]
for i in range(1, 5):
os.remove("shard%d_%s.db" % (i, provision.FOLLOWER_IDENT))
+ @testing.combinations((True,), (False,))
+ @testing.uses_deprecated("Using plain strings")
+ def test_plain_core_textual_lookup_w_shard(self, use_legacy_text):
+ sess = self._fixture_data()
+
+ if use_legacy_text:
+ stmt = "SELECT * FROM weather_locations"
+ else:
+ stmt = text("SELECT * FROM weather_locations")
+
+ eq_(
+ sess.execute(stmt, shard_id="asia").fetchall(),
+ [(1, "Asia", "Tokyo")],
+ )
+
+ @testing.combinations((True,), (False,))
+ @testing.uses_deprecated("Using plain strings")
+ def test_plain_core_textual_lookup(self, use_legacy_text):
+ sess = self._fixture_data()
+
+ if use_legacy_text:
+ stmt = "SELECT * FROM weather_locations WHERE id=1"
+ else:
+ stmt = text("SELECT * FROM weather_locations WHERE id=1")
+ eq_(
+ sess.execute(stmt).fetchall(),
+ [(1, "Asia", "Tokyo")],
+ )
+
class AttachedFileShardTest(ShardTest, fixtures.TestBase):
"""Use modern schema conventions along with SQLite ATTACH."""