"""Get the lines for a Python source file from the cache.
Update the cache if it doesn't contain an entry for this file already."""
- if filename in cache:
- entry = cache[filename]
- if len(entry) != 1:
- return cache[filename][2]
+ entry = cache.get(filename, None)
+ if entry is not None and len(entry) != 1:
+ return entry[2]
try:
return updatecache(filename, module_globals)
def _getlines_from_code(code):
code_id = _make_key(code)
- if code_id in _interactive_cache:
- entry = _interactive_cache[code_id]
- if len(entry) != 1:
- return _interactive_cache[code_id][2]
+ entry = _interactive_cache.get(code_id, None)
+ if entry is not None and len(entry) != 1:
+ return entry[2]
return []
filenames = [filename]
for filename in filenames:
- try:
- entry = cache[filename]
- except KeyError:
- continue
-
- if len(entry) == 1:
+ entry = cache.get(filename, None)
+ if entry is None or len(entry) == 1:
# lazy cache entry, leave it lazy.
continue
size, mtime, lines, fullname = entry
# These import can fail if the interpreter is shutting down
return []
- if filename in cache:
- if len(cache[filename]) != 1:
- cache.pop(filename, None)
+ entry = cache.pop(filename, None)
if _source_unavailable(filename):
return []
# Realise a lazy loader based lookup if there is one
# otherwise try to lookup right now.
- if lazycache(filename, module_globals):
+ lazy_entry = entry if entry is not None and len(entry) == 1 else None
+ if lazy_entry is None:
+ lazy_entry = _make_lazycache_entry(filename, module_globals)
+ if lazy_entry is not None:
try:
- data = cache[filename][0]()
+ data = lazy_entry[0]()
except (ImportError, OSError):
pass
else:
# No luck, the PEP302 loader cannot find the source
# for this module.
return []
- cache[filename] = (
+ entry = (
len(data),
None,
[line + '\n' for line in data.splitlines()],
fullname
)
- return cache[filename][2]
+ cache[filename] = entry
+ return entry[2]
# Try looking through the module search path, which is only useful
# when handling a relative filename.
get_source method must be found, the filename must be a cacheable
filename, and the filename must not be already cached.
"""
- if filename in cache:
- if len(cache[filename]) == 1:
- return True
- else:
- return False
+ entry = cache.get(filename, None)
+ if entry is not None:
+ return len(entry) == 1
+
+ lazy_entry = _make_lazycache_entry(filename, module_globals)
+ if lazy_entry is not None:
+ cache[filename] = lazy_entry
+ return True
+ return False
+
+
+def _make_lazycache_entry(filename, module_globals):
if not filename or (filename.startswith('<') and filename.endswith('>')):
- return False
+ return None
# Try for a __loader__, if available
if module_globals and '__name__' in module_globals:
spec = module_globals.get('__spec__')
if name and get_source:
def get_lines(name=name, *args, **kwargs):
return get_source(name, *args, **kwargs)
- cache[filename] = (get_lines,)
- return True
- return False
+ return (get_lines,)
+ return None
+
+
def _register_code(code, string, name):
entry = (len(string),
for const in code.co_consts:
if isinstance(const, type(code)):
stack.append(const)
- _interactive_cache[_make_key(code)] = entry
+ key = _make_key(code)
+ _interactive_cache[key] = entry
import unittest
import os.path
import tempfile
+import threading
import tokenize
from importlib.machinery import ModuleSpec
from test import support
from test.support import os_helper
+from test.support import threading_helper
from test.support.script_helper import assert_python_ok
self.assertIn(self.unchanged_file, linecache.cache)
+class MultiThreadingTest(unittest.TestCase):
+ @threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
+ def test_read_write_safety(self):
+
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ filenames = []
+ for i in range(10):
+ name = os.path.join(tmpdirname, f"test_{i}.py")
+ with open(name, "w") as h:
+ h.write("import time\n")
+ h.write("import system\n")
+ filenames.append(name)
+
+ def linecache_get_line(b):
+ b.wait()
+ for _ in range(100):
+ for name in filenames:
+ linecache.getline(name, 1)
+
+ def check(funcs):
+ barrier = threading.Barrier(len(funcs))
+ threads = []
+
+ for func in funcs:
+ thread = threading.Thread(target=func, args=(barrier,))
+
+ threads.append(thread)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ check([linecache_get_line] * 20)
+
+
if __name__ == "__main__":
unittest.main()