else:
text = message
message = category(message)
- modules = None
key = (text, category, lineno)
with _wm._lock:
if registry is None:
# as soon as the OS-level thread ends instead.
local = wrlocal()
if local is not None:
- dct = local.dicts.pop(idt)
+ local.dicts.pop(idt)
wrlocal = ref(self, local_deleted)
wrthread = ref(thread, thread_deleted)
thread.__dict__[key] = wrlocal
try:
return await self._sock_sendfile_native(sock, file,
offset, count)
- except exceptions.SendfileNotAvailableError as exc:
+ except exceptions.SendfileNotAvailableError:
if not fallback:
raise
return await self._sock_sendfile_fallback(sock, file,
try:
return await self._sendfile_native(transport, file,
offset, count)
- except exceptions.SendfileNotAvailableError as exc:
+ except exceptions.SendfileNotAvailableError:
if not fallback:
raise
async def _sock_sendfile_native(self, sock, file, offset, count):
try:
fileno = file.fileno()
- except (AttributeError, io.UnsupportedOperation) as err:
+ except (AttributeError, io.UnsupportedOperation):
raise exceptions.SendfileNotAvailableError("not a regular file")
try:
fsize = os.fstat(fileno).st_size
e = e.__context__
print(f"Error retrieving tasks: {e}")
sys.exit(1)
- except PermissionError as e:
+ except PermissionError:
exit_with_permission_help_text()
"os.sendfile() is not available")
try:
fileno = file.fileno()
- except (AttributeError, io.UnsupportedOperation) as err:
+ except (AttributeError, io.UnsupportedOperation):
raise exceptions.SendfileNotAvailableError("not a regular file")
try:
fsize = os.fstat(fileno).st_size
try:
compiler(source + "\n", filename, symbol, flags=flags)
return None
- except _IncompleteInputError as e:
+ except _IncompleteInputError:
return None
- except SyntaxError as e:
+ except SyntaxError:
pass
# fallthrough
cfile = importlib.util.cache_from_source(fullname)
opt_cfiles[opt_level] = cfile
- head, tail = name[:-3], name[-3:]
+ tail = name[-3:]
if tail == '.py':
if not force:
try:
while True:
try:
_queues.put(self._id, obj, unboundop)
- except QueueFull as exc:
+ except QueueFull:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
while True:
try:
obj, unboundop = _queues.get(self._id)
- except QueueEmpty as exc:
+ except QueueEmpty:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
"""
try:
obj, unboundop = _queues.get(self._id)
- except QueueEmpty as exc:
+ except QueueEmpty:
raise # re-raise
if unboundop is not None:
assert obj is None, repr(obj)
fields.append(instr.opname.ljust(_OPNAME_WIDTH))
# Column: Opcode argument
if instr.arg is not None:
- arg = repr(instr.arg)
# If opname is longer than _OPNAME_WIDTH, we allow it to overflow into
# the space reserved for oparg. This results in fewer misaligned opargs
# in the disassembly output.
break
try:
data = binascii.a2b_uu(s)
- except binascii.Error as v:
+ except binascii.Error:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
data = binascii.a2b_uu(s[:nbytes])
port = sock.getsockname()[1] # Get proper port
host = self.sock.getsockname()[0] # Get proper host
if self.af == socket.AF_INET:
- resp = self.sendport(host, port)
+ self.sendport(host, port)
else:
- resp = self.sendeprt(host, port)
+ self.sendeprt(host, port)
if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(self.timeout)
return sock
"""
if callback is None:
callback = print_line
- resp = self.sendcmd('TYPE A')
+ self.sendcmd('TYPE A')
with self.transfercmd(cmd) as conn, \
conn.makefile('r', encoding=self.encoding) as fp:
while 1:
elif file[:2] == '-d':
cmd = 'CWD'
if file[2:]: cmd = cmd + ' ' + file[2:]
- resp = ftp.sendcmd(cmd)
+ ftp.sendcmd(cmd)
elif file == '-p':
ftp.set_pasv(not ftp.passiveserver)
else:
# still adjusting the links.
root = oldroot[NEXT]
oldkey = root[KEY]
- oldresult = root[RESULT]
+ oldresult = root[RESULT] # noqa: F841
root[KEY] = root[RESULT] = None
# Now update the cache dictionary.
self.rpcclt.close()
self.terminate_subprocess()
console = self.tkconsole
- was_executing = console.executing
console.executing = False
self.spawn_subprocess()
try:
self.title(title)
self.viewframe = ViewFrame(self, contents, wrap=wrap)
self.protocol("WM_DELETE_WINDOW", self.ok)
- self.button_ok = button_ok = Button(self, text='Close',
- command=self.ok, takefocus=False)
+ self.button_ok = Button(self, text='Close',
+ command=self.ok, takefocus=False)
self.viewframe.pack(side='top', expand=True, fill='both')
self.is_modal = modal
"""
name = 'PROXYAUTH'
- return self._simple_command('PROXYAUTH', user)
+ return self._simple_command(name, user)
def rename(self, oldmailbox, newmailbox):
try:
self._get_response()
- except self.abort as val:
+ except self.abort:
if __debug__:
if self.debug >= 1:
self.print_log()
try:
optlist, args = getopt.getopt(sys.argv[1:], 'd:s:')
- except getopt.error as val:
+ except getopt.error:
optlist, args = (), ()
stream_command = None
current_parameter = 0
OP = token.OP
- ERRORTOKEN = token.ERRORTOKEN
# token stream always starts with ENCODING token, skip it
t = next(token_stream)
yield "relative_import", (level, fromlist, name)
def scan_code(self, co, m):
- code = co.co_code
scanner = self.scan_opcodes
for what, args in scanner(co):
if what == "store":
# written data and then disconnected -- see Issue 14725.
else:
try:
- res = _winapi.WaitForMultipleObjects(
- [ov.event], False, INFINITE)
+ _winapi.WaitForMultipleObjects([ov.event], False, INFINITE)
except:
ov.cancel()
_winapi.CloseHandle(handle)
while 1:
# Look for a machine, default, or macdef top-level keyword
saved_lineno = lexer.lineno
- toplevel = tt = lexer.get_token()
+ tt = lexer.get_token()
if not tt:
break
elif tt[0] == '#':
try:
if _getfinalpathname(spath) == path:
path = spath
- except ValueError as ex:
+ except ValueError:
# Unexpected, as an invalid path should not have gained a prefix
# at any point, but we ignore this error just in case.
pass
self.values = values
try:
- stop = self._process_args(largs, rargs, values)
+ self._process_args(largs, rargs, values)
except (BadOptionError, OptionValueError) as err:
self.error(str(err))
def save_global(self, obj, name=None):
write = self.write
- memo = self.memo
if name is None:
name = getattr(obj, '__qualname__', None)
i = self.read(1)[0]
try:
self.append(self.memo[i])
- except KeyError as exc:
+ except KeyError:
msg = f'Memo value not found at index {i}'
raise UnpicklingError(msg) from None
dispatch[BINGET[0]] = load_binget
i, = unpack('<I', self.read(4))
try:
self.append(self.memo[i])
- except KeyError as exc:
+ except KeyError:
msg = f'Memo value not found at index {i}'
raise UnpicklingError(msg) from None
dispatch[LONG_BINGET[0]] = load_long_binget
text=True,
encoding="locale",
shell=True)
- except (OSError, subprocess.CalledProcessError) as why:
- #print('Command %s failed: %s' % (cmd, why))
+ except (OSError, subprocess.CalledProcessError):
continue
else:
break
return _readmodule(submodule, parent['__path__'], package)
# Search the path for the module.
- f = None
if inpackage is not None:
search_path = path
else:
items = []
itemsappend = items.append
sourcematch = source.match
- start = source.tell()
while True:
itemsappend(_parse(source, state, verbose, nested + 1,
not nested and not items))
finally:
self._communication_started = True
try:
- sts = self.wait(timeout=self._remaining_time(endtime))
+ self.wait(timeout=self._remaining_time(endtime))
except TimeoutExpired as exc:
exc.timeout = timeout
raise
fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
try:
_os.unlink(name)
- except BaseException as e:
+ except BaseException:
_os.close(fd)
raise
return fd
return None
encoding = _get_normal_name(match.group(1).decode())
try:
- codec = lookup(encoding)
+ lookup(encoding)
except LookupError:
# This behaviour mimics the Python interpreter
if filename is None:
"""Check if the string color is a legal Tkinter color string.
"""
try:
- rgb = self.cv.winfo_rgb(color)
+ self.cv.winfo_rgb(color)
ok = True
except TK.TclError:
ok = False
if action == "rot":
angle, degPAU = data
self._rotate(-angle*degPAU/self._degreesPerAU)
- dummy = self.undobuffer.pop()
+ self.undobuffer.pop()
elif action == "stamp":
stitem = data[0]
self.clearstamp(stitem)
else:
try:
# Is this a sufficient test for sequence-ness?
- x = len(v)
+ len(v)
except TypeError:
# not a sequence
v = quote_via(str(v), safe, encoding, errors)
binpath = context.bin_path
path = context.env_exe
copier = self.symlink_or_copy
- dirname = context.python_dir
copier(context.executable, path)
if not os.path.islink(path):
os.chmod(path, 0o755)