+++ /dev/null
-# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
-
-# This implementation of the immutable decorator is for python 3.6,
-# which doesn't have Context Variables. This implementation is somewhat
-# costly for classes with slots, as it adds a __dict__ to them.
-
-
-import inspect
-
-
-class _Immutable:
- """Immutable mixin class"""
-
- # Note we MUST NOT have __slots__ as that causes
- #
- # TypeError: multiple bases have instance lay-out conflict
- #
- # when we get mixed in with another class with slots. When we
- # get mixed into something with slots, it effectively adds __dict__ to
- # the slots of the other class, which allows attribute setting to work,
- # albeit at the cost of the dictionary.
-
- def __setattr__(self, name, value):
- if not hasattr(self, '_immutable_init') or \
- self._immutable_init is not self:
- raise TypeError("object doesn't support attribute assignment")
- else:
- super().__setattr__(name, value)
-
- def __delattr__(self, name):
- if not hasattr(self, '_immutable_init') or \
- self._immutable_init is not self:
- raise TypeError("object doesn't support attribute assignment")
- else:
- super().__delattr__(name)
-
-
-def _immutable_init(f):
- def nf(*args, **kwargs):
- try:
- # Are we already initializing an immutable class?
- previous = args[0]._immutable_init
- except AttributeError:
- # We are the first!
- previous = None
- object.__setattr__(args[0], '_immutable_init', args[0])
- try:
- # call the actual __init__
- f(*args, **kwargs)
- finally:
- if not previous:
- # If we started the initialization, establish immutability
- # by removing the attribute that allows mutation
- object.__delattr__(args[0], '_immutable_init')
- nf.__signature__ = inspect.signature(f)
- return nf
-
-
-def immutable(cls):
- if _Immutable in cls.__mro__:
- # Some ancestor already has the mixin, so just make sure we keep
- # following the __init__ protocol.
- cls.__init__ = _immutable_init(cls.__init__)
- if hasattr(cls, '__setstate__'):
- cls.__setstate__ = _immutable_init(cls.__setstate__)
- ncls = cls
- else:
- # Mixin the Immutable class and follow the __init__ protocol.
- class ncls(_Immutable, cls):
-
- @_immutable_init
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- if hasattr(cls, '__setstate__'):
- @_immutable_init
- def __setstate__(self, *args, **kwargs):
- super().__setstate__(*args, **kwargs)
-
- # make ncls have the same name and module as cls
- ncls.__name__ = cls.__name__
- ncls.__qualname__ = cls.__qualname__
- ncls.__module__ = cls.__module__
- return ncls
return 'asyncio'
except RuntimeError:
raise AsyncLibraryNotFoundError('no async library detected')
- except AttributeError: # pragma: no cover
- # we have to check current_task on 3.6; we ignore for mypy
- # purposes at it is otherwise unhappy on >= 3.7
- if not asyncio.Task.current_task(): # type: ignore
- raise AsyncLibraryNotFoundError('no async library detected')
- return 'asyncio'
def get_default_backend() -> Backend:
if ssl_context is None:
# See the comment about ssl.create_default_context() in query.py
ssl_context = ssl.create_default_context() # lgtm[py/insecure-protocol]
- if sys.version_info >= (3, 7):
- ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
- else:
- ssl_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
+ ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
if server_hostname is None:
ssl_context.check_hostname = False
else:
import collections.abc
import sys
-# pylint: disable=unused-import
-if sys.version_info >= (3, 7):
- odict = dict
- from dns._immutable_ctx import immutable
-else:
- # pragma: no cover
- from collections import OrderedDict as odict
- from dns._immutable_attr import immutable # noqa
-# pylint: enable=unused-import
+from dns._immutable_ctx import immutable
@immutable
of copied. Only set this if you are sure there will be no external
references to the dictionary.
"""
- if no_copy and isinstance(dictionary, odict):
+ if no_copy and isinstance(dictionary, dict):
self._odict = dictionary
else:
- self._odict = odict(dictionary)
+ self._odict = dict(dictionary)
self._hash = None
def __getitem__(self, key):
if isinstance(o, list):
return tuple(constify(elt) for elt in o)
if isinstance(o, dict):
- cdict = odict()
+ cdict = dict()
for k, v in o.items():
cdict[k] = constify(v)
return Dict(cdict, True)
# can, even though this might require a future dnspython release if that
# version becomes deprecated.
ssl_context = ssl.create_default_context() # lgtm[py/insecure-protocol]
- if sys.version_info >= (3, 7):
- ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
- else:
- ssl_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
+ ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
if server_hostname is None:
ssl_context.check_hostname = False
import itertools
import sys
-if sys.version_info >= (3, 7):
- odict = dict
-else:
- from collections import OrderedDict as odict # pragma: no cover
class Set:
*items*, an iterable or ``None``, the initial set of items.
"""
- self.items = odict()
+ self.items = dict()
if items is not None:
for item in items:
# This is safe for how we use set, but if other code
else:
cls = self.__class__
obj = cls.__new__(cls)
- obj.items = odict()
+ obj.items = dict()
obj.items.update(self.items)
return obj
self.items.clear()
def __eq__(self, other):
- if odict == dict:
- return self.items == other.items
- else:
- # We don't want an ordered comparison.
- if len(self.items) != len(other.items):
- return False
- return all(elt in other.items for elt in self.items)
+ return self.items == other.items
def __ne__(self, other):
return not self.__eq__(other)
sniff_result = 'asyncio'
def async_run(self, afunc):
- try:
- runner = asyncio.run
- except AttributeError:
- # this is only needed for 3.6
- def old_runner(awaitable):
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(awaitable)
- runner = old_runner
- return runner(afunc())
+ return asyncio.run(afunc())
def test_sniff(self):
dns.asyncbackend._default_backend = None
self.backend = dns.asyncbackend.set_default_backend('asyncio')
def async_run(self, afunc):
- try:
- runner = asyncio.run
- except AttributeError:
- # this is only needed for 3.6
- def old_runner(awaitable):
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(awaitable)
- runner = old_runner
- return runner(afunc())
+ return asyncio.run(afunc())
def testResolve(self):
async def run():
valid_tls_url = 'https://doh.cleanbrowsing.org/doh/family-filter/'
q = dns.message.make_query('example.com.', dns.rdatatype.A)
# make sure CleanBrowsing's IP address will fail TLS certificate
- # check. On 3.6 we get ssl.CertificateError instead of
- # httpx.ConnectError.
- with self.assertRaises((httpx.ConnectError, ssl.CertificateError)):
+ # check.
+ with self.assertRaises(httpx.ConnectError):
dns.query.https(q, invalid_tls_url, session=self.session,
timeout=4)
# We can't do the Host header and SNI magic with httpx, but
import unittest
import dns.immutable
-import dns._immutable_attr
-
-try:
- import dns._immutable_ctx as immutable_ctx
- _have_contextvars = True
-except ImportError:
- _have_contextvars = False
-
- class immutable_ctx:
- pass
+import dns._immutable_ctx
class ImmutableTestCase(unittest.TestCase):
class DecoratorTestCase(unittest.TestCase):
- immutable_module = dns._immutable_attr
+ immutable_module = dns._immutable_ctx
def make_classes(self):
class A:
with self.assertRaises(TypeError):
B(a, 20)
self.assertEqual(a.a, 10)
-
-
-@unittest.skipIf(not _have_contextvars, "contextvars not available")
-class CtxDecoratorTestCase(DecoratorTestCase):
-
- immutable_module = immutable_ctx
dns.asyncbackend.set_default_backend('asyncio')
async def run():
await async_inbound_xfr()
- try:
- runner = asyncio.run
- except AttributeError:
- # this is only needed for 3.6
- def old_runner(awaitable):
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(awaitable)
- runner = old_runner
- runner(run())
+ asyncio.run(run())
#
# We don't need to do this as it's all generic code, but