rc.append(self.get_loader(oid, fmt).load)
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
- key = (type(obj), format)
+ # Fast path: return a Dumper class already instantiated from the same type
+ cls = type(obj)
try:
- return self._dumpers_cache[key]
+ return self._dumpers_cache[cls, format]
except KeyError:
pass
- for amap in self._dumpers_maps:
- if key in amap:
- dumper_cls = amap[key]
- break
- else:
- raise e.ProgrammingError(
- f"cannot adapt type {type(obj).__name__}"
- f" to format {Format(format).name}"
- )
-
- self._dumpers_cache[key] = dumper = dumper_cls(key[0], self)
- return dumper
+ # We haven't seen this type in this query yet. Look for an adapter
+ # in contexts from the most specific to the most generic.
+ # Also look for superclasses: if you can adapt a type you should be
+ # able to adapt its subtypes, otherwise Liskov is sad.
+ for dmap in self._dumpers_maps:
+ for scls in cls.__mro__:
+ key = (scls, format)
+ if key in dmap:
+ self._dumpers_cache[key] = dumper = dmap[key](scls, self)
+ return dumper
+
+ raise e.ProgrammingError(
+ f"cannot adapt type {type(obj).__name__}"
+ f" to format {Format(format).name}"
+ )
def load_row(self, row: int) -> Optional[Tuple[Any, ...]]:
res = self.pgresult
return row_loader
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
- key = (type(obj), format)
+ # Fast path: return a Dumper class already instantiated from the same type
+ cls = type(obj)
try:
- return self._dumpers_cache[key]
+ return self._dumpers_cache[cls, format]
except KeyError:
pass
- for amap in self._dumpers_maps:
- if key in amap:
- dumper_cls = amap[key]
- break
- else:
- raise e.ProgrammingError(
- f"cannot adapt type {type(obj).__name__}"
- f" to format {Format(format).name}"
- )
-
- self._dumpers_cache[key] = dumper = dumper_cls(key[0], self)
- return dumper
+ # We haven't seen this type in this query yet. Look for an adapter
+ # in contexts from the most specific to the most generic.
+ # Also look for superclasses: if you can adapt a type you should be
+ # able to adapt its subtypes, otherwise Liskov is sad.
+ for dmap in self._dumpers_maps:
+ for scls in cls.__mro__:
+ key = (scls, format)
+ if key in dmap:
+ self._dumpers_cache[key] = dumper = dmap[key](scls, self)
+ return dumper
+
+ raise e.ProgrammingError(
+ f"cannot adapt type {type(obj).__name__}"
+ f" to format {Format(format).name}"
+ )
def load_row(self, row: int) -> Optional[Tuple[Any, ...]]:
if self._pgresult is None:
assert cur.fetchone() == ("hellot", "worldb")
+@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+def test_dump_subclass(conn, fmt_out):
+ class MyString(str):
+ pass
+
+ cur = conn.cursor()
+ cur.execute("select %s, %b", [MyString("hello"), MyString("world")])
+ assert cur.fetchone() == ("hello", "world")
+
+
@pytest.mark.parametrize(
"data, format, type, result",
[