from collections.abc import Callable
from threading import Lock
- from typing import Concatenate, ParamSpec, TypeVar
-
- P = ParamSpec('P')
- R = TypeVar('R')
+ from typing import Concatenate
# Use this lock to ensure that only one thread is executing a function
# at any time.
my_lock = Lock()
- def with_lock(f: Callable[Concatenate[Lock, P], R]) -> Callable[P, R]:
+ def with_lock[**P, R](f: Callable[Concatenate[Lock, P], R]) -> Callable[P, R]:
'''A type-safe decorator which provides a lock.'''
def inner(*args: P.args, **kwargs: P.kwargs) -> R:
# Provide the lock as the first argument.