# Lines ending with binary operators are OK
W504,
-max-line-length = 80
+max-line-length = 120
async def sendto(self, what, destination, timeout):
async with _maybe_timeout(timeout):
return await self.socket.sendto(what, destination)
- raise dns.exception.Timeout(timeout=timeout) # pragma: no cover
+ raise dns.exception.Timeout(timeout=timeout) # pragma: no cover lgtm[py/unreachable-statement]
async def recvfrom(self, size, timeout):
async with _maybe_timeout(timeout):
return await self.socket.recvfrom(size)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # lgtm[py/unreachable-statement]
async def close(self):
await self.socket.close()
async def sendall(self, what, timeout):
async with _maybe_timeout(timeout):
return await self.socket.sendall(what)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # lgtm[py/unreachable-statement]
async def recv(self, size, timeout):
async with _maybe_timeout(timeout):
return await self.socket.recv(size)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # lgtm[py/unreachable-statement]
async def close(self):
await self.socket.close()
async def sendto(self, what, destination, timeout):
with _maybe_timeout(timeout):
return await self.socket.sendto(what, destination)
- raise dns.exception.Timeout(timeout=timeout) # pragma: no cover
+ raise dns.exception.Timeout(timeout=timeout) # pragma: no cover lgtm[py/unreachable-statement]
async def recvfrom(self, size, timeout):
with _maybe_timeout(timeout):
return await self.socket.recvfrom(size)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # lgtm[py/unreachable-statement]
async def close(self):
self.socket.close()
async def sendall(self, what, timeout):
with _maybe_timeout(timeout):
return await self.stream.send_all(what)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # lgtm[py/unreachable-statement]
async def recv(self, size, timeout):
with _maybe_timeout(timeout):
return await self.stream.receive_some(size)
- raise dns.exception.Timeout(timeout=timeout)
+ raise dns.exception.Timeout(timeout=timeout) # lgtm[py/unreachable-statement]
async def close(self):
await self.stream.aclose()
# pylint: disable=unused-import
from dns._asyncbackend import Socket, DatagramSocket, \
- StreamSocket, Backend # noqa:
+ StreamSocket, Backend # noqa: F401 lgtm[py/unused-import]
# pylint: enable=unused-import
(begin_time, expiration) = _compute_times(timeout)
if not sock:
if ssl_context is None:
- ssl_context = ssl.create_default_context()
+ # See the comment about ssl.create_default_context() in query.py
+ ssl_context = ssl.create_default_context() # lgtm[py/insecure-protocol]
if server_hostname is None:
ssl_context.check_hostname = False
else:
import dns.asyncquery
import dns.exception
import dns.query
-import dns.resolver
+import dns.resolver # lgtm[py/import-and-import-from]
# import some resolver symbols for brevity
from dns.resolver import NXDOMAIN, NoAnswer, NotAbsolute, NoRootSOA
return self.to_text()
-class GenericOption(Option):
+class GenericOption(Option): # lgtm[py/missing-equals]
"""Generic Option Class
return cls(otype, parser.get_remaining())
-class ECSOption(Option):
+class ECSOption(Option): # lgtm[py/missing-equals]
"""EDNS Client Subnet (ECS, RFC7871)"""
def __init__(self, address, srclen=None, scopelen=0):
return 65535
-class EDEOption(Option):
+class EDEOption(Option): # lgtm[py/missing-equals]
"""Extended DNS Error (EDE, RFC8914)"""
def __init__(self, code, text=None):
def __init__(self, *args, **kwargs):
self._check_params(*args, **kwargs)
if kwargs:
- self.kwargs = self._check_kwargs(**kwargs)
+ # This call to a virtual method from __init__ is ok in our usage
+ self.kwargs = self._check_kwargs(**kwargs) # lgtm[py/init-calls-subclass]
self.msg = str(self)
else:
self.kwargs = dict() # defined but empty for old mode exceptions
@immutable
-class Dict(collections.abc.Mapping):
+class Dict(collections.abc.Mapping): # lgtm[py/missing-equals]
def __init__(self, dictionary, no_copy=False):
"""Make an immutable dictionary from the specified dictionary.
(af, destination, source) = _destination_and_source(where, port,
source, source_port)
if ssl_context is None and not sock:
- ssl_context = ssl.create_default_context()
+ # LGTM complains about this because the default might permit TLS < 1.2
+ # for compatibility, but the python documentation says that explicit
+ # versioning is deprecated. and that as of python 3.6 it will negotiate
+ # the highest version possible. While we can set a minimum version,
+ # this isn't great either as we might set it lower than a future
+ # python version would.
+ ssl_context = ssl.create_default_context() # lgtm[py/insecure-protocol]
if server_hostname is None:
ssl_context.check_hostname = False
@dns.immutable.immutable
-class ImmutableRdataset(Rdataset):
+class ImmutableRdataset(Rdataset): # lgtm[py/missing-equals]
"""An immutable DNS rdataset."""
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import dns.rdtypes.dnskeybase
+import dns.rdtypes.dnskeybase # lgtm[py/import-and-import-from]
import dns.immutable
# pylint: disable=unused-import
-from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
+from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401 lgtm[py/unused-import]
# pylint: enable=unused-import
@dns.immutable.immutable
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import dns.rdtypes.dnskeybase
+import dns.rdtypes.dnskeybase # lgtm[py/import-and-import-from]
import dns.immutable
# pylint: disable=unused-import
-from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
+from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401 lgtm[py/unused-import]
# pylint: enable=unused-import
@dns.immutable.immutable
for k, v in params.items():
k = ParamKey.make(k)
if not isinstance(v, Param) and v is not None:
- raise ValueError("not a Param")
+ raise ValueError(f"{k:d} not a Param")
self.params = dns.immutable.Dict(params)
# Make sure any parameter listed as mandatory is present in the
# record.
self.items = odict()
if items is not None:
for item in items:
- self.add(item)
+ # This is safe for how we use set, but if other code
+ # subclasses it could be a legitimate issue.
+ self.add(item) # lgtm[py/init-calls-subclass]
def __repr__(self):
return "dns.set.Set(%s)" % repr(list(self.items.keys()))
def pop(self):
"""Remove an arbitrary item from the set."""
- (k, v) = self.items.popitem()
+ (k, _) = self.items.popitem()
return k
def _clone(self):
return 3
-class UpdateMessage(dns.message.Message):
+class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals]
_section_enum = UpdateSection
Transaction = dns.zone.Transaction
-class Zone(dns.zone.Zone):
+class Zone(dns.zone.Zone): # lgtm[py/missing-equals]
__slots__ = ['_versions', '_versions_lock', '_write_txn',
'_write_waiters', '_write_event', '_pruning_policy',
# A node with a version id.
-class VersionedNode(dns.node.Node):
+class VersionedNode(dns.node.Node): # lgtm[py/missing-equals]
__slots__ = ['id']
def __init__(self):
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details
msg-template='{path}:{line}: [{msg_id}({symbol}), {obj}] {msg})'
+
+[FORMAT]
+max-line-length=120