--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 9418
+
+ Fixed regression involving pickling of Python rows between the cython and
+ pure Python implementations of :class:`.Row`, which occurred as part of
+ refactoring code for version 2.0 with typing. A particular constant were
+ turned into a string based ``Enum`` for the pure Python version of
+ :class:`.Row` whereas the cython version continued to use an integer
+ constant, leading to deserialization failures.
MD_INDEX = 0 # integer index in cursor.description
-class _KeyStyle(enum.Enum):
+class _KeyStyle(enum.IntEnum):
KEY_INTEGER_ONLY = 0
"""__getitem__ only allows integer values and slices, raises TypeError
otherwise"""
mdindex = rec[MD_INDEX]
if mdindex is None:
self._parent._raise_for_ambiguous_column_name(rec)
+ # NOTE: keep "== KEY_OBJECTS_ONLY" instead of "is KEY_OBJECTS_ONLY"
+ # since deserializing the class from cython will load an int in
+ # _key_style, not an instance of _KeyStyle
elif self._key_style == KEY_OBJECTS_ONLY and isinstance(key, int):
raise KeyError(key)
# yes, this thing needs this much testing
for pickle_ in picklers:
- for protocol in range(-2, pickle.HIGHEST_PROTOCOL):
+ for protocol in range(-2, pickle.HIGHEST_PROTOCOL + 1):
yield pickle_.loads, lambda d: pickle_.dumps(d, protocol)
eq_(kt._asdict(), {"a": 1, "b": 3})
@testing.requires.cextensions
- def test_serialize_cy_py_cy(self):
- from sqlalchemy.engine._py_row import BaseRow as _PyRow
- from sqlalchemy.cyextension.resultproxy import BaseRow as _CyRow
+ @testing.variation("direction", ["py_to_cy", "cy_to_py"])
+ def test_serialize_cy_py_cy(self, direction: testing.Variation):
+ from sqlalchemy.engine import _py_row
+ from sqlalchemy.cyextension import resultproxy as _cy_row
global Row
- p = result.SimpleResultMetaData(["a", None, "b"])
+ p = result.SimpleResultMetaData(["a", "w", "b"])
+
+ if direction.py_to_cy:
+ dump_cls = _py_row.BaseRow
+ num = _py_row.KEY_INTEGER_ONLY
+ load_cls = _cy_row.BaseRow
+ elif direction.cy_to_py:
+ dump_cls = _cy_row.BaseRow
+ num = _cy_row.KEY_INTEGER_ONLY
+ load_cls = _py_row.BaseRow
+ else:
+ direction.fail()
for loads, dumps in picklers():
- class Row(_CyRow):
+ class Row(dump_cls):
pass
- row = Row(p, p._processors, p._keymap, 0, (1, 2, 3))
+ row = Row(p, p._processors, p._keymap, num, (1, 2, 3))
state = dumps(row)
- class Row(_PyRow):
+ class Row(load_cls):
pass
row2 = loads(state)
- is_true(isinstance(row2, _PyRow))
+ is_true(isinstance(row2, load_cls))
+ is_false(isinstance(row2, dump_cls))
state2 = dumps(row2)
- class Row(_CyRow):
+ class Row(dump_cls):
pass
row3 = loads(state2)
- is_true(isinstance(row3, _CyRow))
+ is_true(isinstance(row3, dump_cls))
class ResultTest(fixtures.TestBase):
s = pickle.dumps(sym1)
pickle.loads(s)
- for protocol in 0, 1, 2:
- print(protocol)
- serial = pickle.dumps(sym1)
+ for _, dumper in picklers():
+ serial = dumper(sym1)
rt = pickle.loads(serial)
assert rt is sym1
assert rt is sym2
import csv
from io import StringIO
import operator
+import os
import pickle
+import subprocess
+import sys
+from tempfile import mkstemp
from unittest.mock import Mock
from unittest.mock import patch
lambda: result[0]._mapping[addresses.c.address_id],
)
- # @testing.variation("use_labels", [True, False])
- # def _dont_test_pickle_rows_other_process(self, connection, use_labels):
- # result = self._pickle_row_data(connection, use_labels)
-
- # f, name = mkstemp("pkl")
- # with os.fdopen(f, "wb") as f:
- # pickle.dump(result, f)
- # name = name.replace(os.sep, "/")
- # code = (
- # "import sqlalchemy; import pickle; print(["
- # f"r[0] for r in pickle.load(open('''{name}''', 'rb'))])"
- # )
- # proc = subprocess.run(
- # [sys.executable, "-c", code], stdout=subprocess.PIPE
- # )
- # exp = str([r[0] for r in result]).encode()
- # eq_(proc.returncode, 0)
- # eq_(proc.stdout.strip(), exp)
- # os.unlink(name)
+ @testing.variation("use_labels", [True, False])
+ def test_pickle_rows_other_process(self, connection, use_labels):
+ result = self._pickle_row_data(connection, use_labels)
+
+ f, name = mkstemp("pkl")
+ with os.fdopen(f, "wb") as f:
+ pickle.dump(result, f)
+ name = name.replace(os.sep, "/")
+ code = (
+ "import sqlalchemy; import pickle; print(["
+ f"r[0] for r in pickle.load(open('''{name}''', 'rb'))])"
+ )
+ proc = subprocess.run(
+ [sys.executable, "-c", code], stdout=subprocess.PIPE
+ )
+ exp = str([r[0] for r in result]).encode()
+ eq_(proc.returncode, 0)
+ eq_(proc.stdout.strip(), exp)
+ os.unlink(name)
def test_column_error_printing(self, connection):
result = connection.execute(select(1))