From: Daniele Varrazzo Date: Wed, 4 Nov 2020 17:08:34 +0000 (+0100) Subject: Added network types X-Git-Tag: 3.0.dev0~399 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f9c23d23e32203dfb88c4aa9c1e5926dcb5ea087;p=thirdparty%2Fpsycopg.git Added network types --- diff --git a/psycopg3/psycopg3/types/__init__.py b/psycopg3/psycopg3/types/__init__.py index a75bbc40a..80c0f3345 100644 --- a/psycopg3/psycopg3/types/__init__.py +++ b/psycopg3/psycopg3/types/__init__.py @@ -8,8 +8,8 @@ psycopg3 types package from ..oids import builtins # Register default adapters -from . import array, composite, date, json, numeric, singletons, text # noqa -from . import uuid # noqa +from . import array, composite, date, json, network, numeric # noqa +from . import singletons, text, uuid # noqa # Register associations with array oids array.register_all_arrays() diff --git a/psycopg3/psycopg3/types/network.py b/psycopg3/psycopg3/types/network.py new file mode 100644 index 000000000..42a7ae3a5 --- /dev/null +++ b/psycopg3/psycopg3/types/network.py @@ -0,0 +1,57 @@ +""" +Adapters for network types. +""" + +# Copyright (C) 2020 The Psycopg Team + +# TODO: consiter lazy dumper registration. +from ipaddress import ip_address, ip_interface, ip_network +from ipaddress import IPv4Address, IPv4Interface, IPv4Network +from ipaddress import IPv6Address, IPv6Interface, IPv6Network + +from typing import cast, Union + +from ..oids import builtins +from ..adapt import Dumper, Loader +from ..utils.codecs import encode_ascii, decode_ascii + +Address = Union[IPv4Address, IPv6Address] +Interface = Union[IPv4Interface, IPv6Interface] +Network = Union[IPv4Network, IPv6Network] + + +@Dumper.text(IPv4Address) +@Dumper.text(IPv6Address) +@Dumper.text(IPv4Interface) +@Dumper.text(IPv6Interface) +class InterfaceDumper(Dumper): + + oid = builtins["inet"].oid + + def dump(self, obj: Interface) -> bytes: + return encode_ascii(str(obj))[0] + + +@Dumper.text(IPv4Network) +@Dumper.text(IPv6Network) +class NetworkDumper(Dumper): + + oid = builtins["cidr"].oid + + def dump(self, obj: Network) -> bytes: + return encode_ascii(str(obj))[0] + + +@Loader.text(builtins["inet"].oid) +class InetLoader(Loader): + def load(self, data: bytes) -> Union[Address, Interface]: + if b"/" in data: + return cast(Interface, ip_interface(decode_ascii(data)[0])) + else: + return cast(Address, ip_address(decode_ascii(data)[0])) + + +@Loader.text(builtins["cidr"].oid) +class CidrLoader(Loader): + def load(self, data: bytes) -> Network: + return cast(Network, ip_network(decode_ascii(data)[0])) diff --git a/psycopg3/psycopg3/utils/codecs.py b/psycopg3/psycopg3/utils/codecs.py deleted file mode 100644 index e9c31b98e..000000000 --- a/psycopg3/psycopg3/utils/codecs.py +++ /dev/null @@ -1,16 +0,0 @@ -""" -Utility module to access fast encoders/decoders -""" - -# Copyright (C) 2020 The Psycopg Team - -import codecs -from typing import Callable, Tuple - -EncodeFunc = Callable[[str], Tuple[bytes, int]] -DecodeFunc = Callable[[bytes], Tuple[str, int]] - -encode_ascii = codecs.lookup("ascii").encode -decode_ascii = codecs.lookup("ascii").decode -encode_utf8 = codecs.lookup("utf8").encode -decode_utf8 = codecs.lookup("utf8").decode diff --git a/tests/types/test_network.py b/tests/types/test_network.py new file mode 100644 index 000000000..c04a9113f --- /dev/null +++ b/tests/types/test_network.py @@ -0,0 +1,89 @@ +import ipaddress + +import pytest + +from psycopg3.adapt import Format + + +@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("val", ["192.168.0.1", "2001:db8::"]) +def test_address_dump(conn, fmt_in, val): + binary_check(fmt_in) + ph = "%s" if fmt_in == Format.TEXT else "%b" + cur = conn.cursor() + cur.execute(f"select {ph} = %s::inet", (ipaddress.ip_address(val), val)) + assert cur.fetchone()[0] is True + cur.execute( + f"select {ph} = array[null, %s]::inet[]", + ([None, ipaddress.ip_interface(val)], val), + ) + assert cur.fetchone()[0] is True + + +@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("val", ["127.0.0.1/24", "::ffff:102:300/128"]) +def test_interface_dump(conn, fmt_in, val): + binary_check(fmt_in) + ph = "%s" if fmt_in == Format.TEXT else "%b" + cur = conn.cursor() + cur.execute(f"select {ph} = %s::inet", (ipaddress.ip_interface(val), val)) + assert cur.fetchone()[0] is True + cur.execute( + f"select {ph} = array[null, %s]::inet[]", + ([None, ipaddress.ip_interface(val)], val), + ) + assert cur.fetchone()[0] is True + + +@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("val", ["127.0.0.0/24", "::ffff:102:300/128"]) +def test_network_dump(conn, fmt_in, val): + binary_check(fmt_in) + ph = "%s" if fmt_in == Format.TEXT else "%b" + cur = conn.cursor() + cur.execute(f"select {ph} = %s::cidr", (ipaddress.ip_network(val), val)) + assert cur.fetchone()[0] is True + cur.execute( + f"select {ph} = array[NULL, %s]::cidr[]", + ([None, ipaddress.ip_network(val)], val), + ) + assert cur.fetchone()[0] is True + + +@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("val", ["127.0.0.1/32", "::ffff:102:300/128"]) +def test_inet_load_address(conn, fmt_out, val): + binary_check(fmt_out) + cur = conn.cursor(format=fmt_out) + cur.execute("select %s::inet", (val,)) + addr = ipaddress.ip_address(val.split("/", 1)[0]) + assert cur.fetchone()[0] == addr + cur.execute("select array[null, %s::inet]", (val,)) + assert cur.fetchone()[0] == [None, addr] + + +@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("val", ["127.0.0.1/24", "::ffff:102:300/127"]) +def test_inet_load_network(conn, fmt_out, val): + binary_check(fmt_out) + cur = conn.cursor(format=fmt_out) + cur.execute("select %s::inet", (val,)) + assert cur.fetchone()[0] == ipaddress.ip_interface(val) + cur.execute("select array[null, %s::inet]", (val,)) + assert cur.fetchone()[0] == [None, ipaddress.ip_interface(val)] + + +@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("val", ["127.0.0.0/24", "::ffff:102:300/128"]) +def test_cidr_load(conn, fmt_out, val): + binary_check(fmt_out) + cur = conn.cursor(format=fmt_out) + cur.execute("select %s::cidr", (val,)) + assert cur.fetchone()[0] == ipaddress.ip_network(val) + cur.execute("select array[null, %s::cidr]", (val,)) + assert cur.fetchone()[0] == [None, ipaddress.ip_network(val)] + + +def binary_check(fmt): + if fmt == Format.BINARY: + pytest.xfail("inet binary not implemented")