# Copyright (C) 2020 The Psycopg Team
+from typing import Any
+
+from cpython.bytes cimport PyBytes_AsStringAndSize
+
from psycopg3_c.adapt cimport cloader_func, get_context_func
+from psycopg3_c cimport libpq as impl
import logging
logger = logging.getLogger("psycopg3.adapt")
+# TODO: rename after dropping CLoader below
+cdef class PyxLoader:
+ cdef impl.Oid oid
+ cdef impl.PGconn* pgconn_ptr
+ cdef object context
+
+ def __init__(self, oid: int, context: "AdaptContext" = None):
+ from psycopg3.adapt import _connection_from_context
+
+ self.oid = oid
+ self.context = context
+ connection = _connection_from_context(context)
+ # TODO
+ self.pgconn_ptr = NULL
+
+ cdef object cload(self, const char *data, size_t length):
+ raise NotImplementedError()
+
+ def load(self, data: bytes) -> Any:
+ cdef char *buffer
+ cdef Py_ssize_t length
+ PyBytes_AsStringAndSize(data, &buffer, &length)
+ return self.cload(data, length)
+
+
cdef class CLoader:
cdef object pyloader
cdef cloader_func cloader
return data[:length]
+cdef class BinaryByteaLoader(PyxLoader):
+ cdef object cload(self, const char *data, size_t length):
+ return data[:length]
+
+
cdef void register_text_c_loaders():
logger.debug("registering optimised text c loaders")
from psycopg3.types import text