# Copyright (C) 2020-2021 The Psycopg Team
import struct
+from math import log
from typing import Any, Callable, DefaultDict, Dict, Tuple, Union, cast
from decimal import Decimal, DefaultContext, Context
return _pack_int8(obj)
+# Ratio between number of bits required to store a number and number of pg
+# decimal digits required.
+BIT_PER_PGDIGIT = log(2) / log(10_000)
+
+
class IntNumericBinaryDumper(IntNumericDumper):
format = Format.BINARY
- def dump(self, obj: int) -> bytes:
- raise NotImplementedError("binary decimal dump not implemented yet")
+ def dump(self, obj: int) -> bytearray:
+ ndigits = int(obj.bit_length() * BIT_PER_PGDIGIT) + 1
+ out = bytearray(b"\x00\x00" * (ndigits + 4))
+ if obj < 0:
+ sign = NUMERIC_NEG
+ obj = -obj
+ else:
+ sign = NUMERIC_POS
+
+ out[:8] = _pack_numeric_head(ndigits, ndigits - 1, sign, 0)
+ i = 8 + (ndigits - 1) * 2
+ while obj:
+ rem = obj % 10_000
+ obj //= 10_000
+ out[i : i + 2] = _pack_uint2(rem)
+ i -= 2
+
+ return out
class OidBinaryDumper(OidDumper):
return sizeof(int64_t)
+# Ratio between number of bits required to store a number and number of pg
+# decimal digits required.
+DEF BIT_PER_PGDIGIT = 0.07525749891599529 # log(2) / log(10_000)
+
+DEF NUMERIC_POS = 0x0000
+DEF NUMERIC_NEG = 0x4000
+
+
@cython.final
cdef class IntNumericBinaryDumper(CDumper):
self.oid = oids.NUMERIC_OID
cdef Py_ssize_t cdump(self, obj, bytearray rv, Py_ssize_t offset) except -1:
- raise NotImplementedError("binary decimal dump not implemented yet")
+ # Calculate the number of PG digits required to store the number
+ cdef uint16_t ndigits
+ ndigits = <uint16_t>((<int>obj.bit_length()) * BIT_PER_PGDIGIT) + 1
+
+ cdef uint16_t sign = NUMERIC_POS
+ if obj < 0:
+ sign = NUMERIC_NEG
+ obj = -obj
+
+ cdef Py_ssize_t length = sizeof(uint16_t) * (ndigits + 4)
+ cdef uint16_t *buf
+ buf = <uint16_t *><void *>CDumper.ensure_size(rv, offset, length)
+ buf[0] = endian.htobe16(ndigits)
+ buf[1] = endian.htobe16(ndigits - 1) # weight
+ buf[2] = endian.htobe16(sign)
+ buf[3] = 0 # dscale
+
+ cdef int i = 4 + ndigits - 1
+ cdef uint16_t rem
+ while obj:
+ rem = obj % 10000
+ obj //= 10000
+ buf[i] = endian.htobe16(rem)
+ i -= 1
+ while i > 3:
+ buf[i] = 0
+ i -= 1
+
+ return length
cdef class IntDumper(_NumberDumper):
assert got == want
def make_int(self, spec):
- return randrange(-(1 << 63), 1 << 63)
+ return randrange(-(1 << 90), 1 << 90)
def make_Int2(self, spec):
return spec(randrange(-(1 << 15), 1 << 15))
)
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_dump_int_subtypes(conn, val, expr, fmt_in):
- if fmt_in in (Format.AUTO, Format.BINARY) and "numeric" in expr:
- pytest.xfail("binary numeric not implemented")
cur = conn.cursor()
cur.execute(f"select pg_typeof({expr}) = pg_typeof(%{fmt_in})", (val,))
assert cur.fetchone()[0] is True
- cur.execute(f"select {expr} = %{fmt_in}", (val,))
- assert cur.fetchone()[0] is True
+ cur.execute(
+ f"select {expr} = %(v){fmt_in}, {expr}::text, %(v){fmt_in}::text",
+ {"v": val},
+ )
+ ok, want, got = cur.fetchone()
+ assert got == want
+ assert ok
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
(-42, b" -42"),
(int(2 ** 63 - 1), b"9223372036854775807"),
(int(-(2 ** 63)), b" -9223372036854775808"),
+ (int(2 ** 63), b"9223372036854775808"),
+ (int(-(2 ** 63 + 1)), b" -9223372036854775809"),
+ (int(2 ** 100), b"1267650600228229401496703205376"),
+ (int(-(2 ** 100)), b" -1267650600228229401496703205376"),
],
)
def test_quote_int(conn, val, expr):