def get_isolation_level_values(
self, dbapi_connection: interfaces.DBAPIConnection
) -> List[IsolationLevel]:
- return super().get_isolation_level_values(dbapi_connection) + [ # type: ignore # noqa: E501
+ return super().get_isolation_level_values(dbapi_connection) + [
"AUTOCOMMIT"
]
if hasattr(module, driver):
module = getattr(module, driver)
- return lambda: module.dialect # type: ignore
+ return lambda: module.dialect
else:
return None
else:
return 0
- def __eq__(self, other: Any) -> bool: # type: ignore[override] # noqa: E501
+ def __eq__(self, other: Any) -> bool:
"""Compare this range to the `other` taking into account
bounds inclusivity, returning ``True`` if they are equal.
"""
# most DBAPIs happy with this for execute().
# not cx_oracle.
- execute_sequence_format = tuple # type: ignore
+ execute_sequence_format = tuple
supports_schemas = True
supports_views = True
and compile_state._has_multi_parameters
):
if column._is_multiparam_column:
- index = column.index + 1 # type: ignore
+ index = column.index + 1
d = {column.original.key: parameters[column.key]}
else:
d = {column.key: parameters[column.key]}
param[param_key] = arg
elif is_callable:
self.current_column = c
- param[param_key] = arg(self) # type: ignore
+ param[param_key] = arg(self)
else:
val = fallback(c)
if val is not None:
if default_dispatch is None and hasattr(
target, "_no_async_engine_events"
):
- target._no_async_engine_events() # type: ignore
+ target._no_async_engine_events()
return default_dispatch
_dispatch_target = Dialect
@classmethod
- def _listen( # type: ignore
+ def _listen(
cls,
event_key: event._EventKey[Dialect],
*,
dialect_args[k] = kw.pop(k)
# create dialect
- dialect = dialect_cls(**dialect_args) # type: ignore
+ dialect = dialect_cls(**dialect_args)
return MockConnection(dialect, executor)
cls, init: Callable[..., Any], bind: Union[Engine, Connection]
) -> Inspector:
if hasattr(bind.dialect, "inspector"):
- cls = bind.dialect.inspector # type: ignore[attr-defined]
+ cls = bind.dialect.inspector
self = cls.__new__(cls)
init(self, bind)
if hasattr(bind, "exec_driver_sql"):
self._init_connection(bind) # type: ignore[arg-type]
else:
- self._init_engine(bind) # type: ignore[arg-type]
+ self._init_engine(bind)
def _init_engine(self, engine: Engine) -> None:
self.bind = self.engine = engine
orig_name = col_d["name"]
table.metadata.dispatch.column_reflect(self, table, col_d)
- table.dispatch.column_reflect( # type: ignore[attr-defined]
- self, table, col_d
- )
+ table.dispatch.column_reflect(self, table, col_d)
# fetch name again as column_reflect is allowed to
# change it
@classmethod
def pk_constraint(cls) -> ReflectedPrimaryKeyConstraint:
- return { # type: ignore # pep-655 not supported
+ return {
"name": None,
"constrained_columns": [],
}
indexes: Sequence[int]
new_keys: Sequence[str]
extra: Sequence[Any]
- indexes, new_keys, extra = zip(*metadata_for_keys) # type: ignore
+ indexes, new_keys, extra = zip(*metadata_for_keys)
if self._translated_indexes:
indexes = [self._translated_indexes[idx] for idx in indexes]
else:
_proc = Row
- def process_row( # type: ignore
+ def process_row(
metadata: ResultMetaData,
processors: Optional[_ProcessorsType],
key_to_index: Mapping[_KeyType, int],
connection.info[key] = val = fn(self, connection)
return val
- return decorated # type: ignore
+ return decorated
class _TConsSubject(Protocol):
"Type[_Dispatch[_ET]]",
type(
"%sDispatch" % classname,
- (dispatch_base,), # type: ignore
+ (dispatch_base,),
{"__slots__": event_names},
),
)
assert dispatch_target_cls is not None
if (
hasattr(dispatch_target_cls, "__slots__")
- and "_slots_dispatch" in dispatch_target_cls.__slots__ # type: ignore # noqa: E501
+ and "_slots_dispatch" in dispatch_target_cls.__slots__
):
dispatch_target_cls.dispatch = slots_dispatcher(cls)
else:
):
self.target = target
self.identifier = identifier
- self.fn = fn # type: ignore[assignment]
+ self.fn = fn
if isinstance(fn, types.MethodType):
self.fn_key = id(fn.__func__), id(fn.__self__)
else:
target_assoc = self._unwrap_target_assoc_proxy
if target_assoc is not None:
- inner = target_assoc._criterion_exists( # type: ignore
+ inner = target_assoc._criterion_exists(
criterion=criterion, **kwargs
)
return self._comparator._criterion_exists(inner)
return set(self).symmetric_difference(__s)
def __xor__(self, s: AbstractSet[_S]) -> MutableSet[Union[_T, _S]]:
- return self.symmetric_difference(s) # type: ignore
+ return self.symmetric_difference(s)
def symmetric_difference_update(self, other: Iterable[Any]) -> None:
want, have = self.symmetric_difference(other), set(self)
)
proxy_ref = weakref.ref(
self,
- functools.partial( # type: ignore
- ReversibleProxy._target_gced, target_ref
- ),
+ functools.partial(ReversibleProxy._target_gced, target_ref),
)
ReversibleProxy._proxy_objects[target_ref] = proxy_ref
return self.start().__await__()
async def __aenter__(self) -> _T_co:
- return await self.start(is_ctxmanager=True) # type: ignore
+ return await self.start(is_ctxmanager=True)
@abc.abstractmethod
async def __aexit__(
.. versionadded:: 2.0.0b3
"""
- return self._real_result.closed # type: ignore
+ return self._real_result.closed
class AsyncResult(_WithKeys, AsyncCommon[Row[_TP]]):
def operate(
self, op: OperatorType, *other: Any, **kwargs: Any
) -> ColumnElement[Any]:
- return op(self.expression, *other, **kwargs) # type: ignore
+ return op(self.expression, *other, **kwargs)
def reverse_operate(
self, op: OperatorType, other: Any, **kwargs: Any
from ... import util
-COLUMN: int = util.symbol("COLUMN") # type: ignore
-RELATIONSHIP: int = util.symbol("RELATIONSHIP") # type: ignore
-REGISTRY: int = util.symbol("REGISTRY") # type: ignore
-COLUMN_PROPERTY: int = util.symbol("COLUMN_PROPERTY") # type: ignore
-TYPEENGINE: int = util.symbol("TYPEENGNE") # type: ignore
-MAPPED: int = util.symbol("MAPPED") # type: ignore
-DECLARATIVE_BASE: int = util.symbol("DECLARATIVE_BASE") # type: ignore
-DECLARATIVE_META: int = util.symbol("DECLARATIVE_META") # type: ignore
-MAPPED_DECORATOR: int = util.symbol("MAPPED_DECORATOR") # type: ignore
-COLUMN_PROPERTY: int = util.symbol("COLUMN_PROPERTY") # type: ignore
-SYNONYM_PROPERTY: int = util.symbol("SYNONYM_PROPERTY") # type: ignore
-COMPOSITE_PROPERTY: int = util.symbol("COMPOSITE_PROPERTY") # type: ignore
-DECLARED_ATTR: int = util.symbol("DECLARED_ATTR") # type: ignore
-MAPPER_PROPERTY: int = util.symbol("MAPPER_PROPERTY") # type: ignore
-AS_DECLARATIVE: int = util.symbol("AS_DECLARATIVE") # type: ignore
-AS_DECLARATIVE_BASE: int = util.symbol("AS_DECLARATIVE_BASE") # type: ignore
-DECLARATIVE_MIXIN: int = util.symbol("DECLARATIVE_MIXIN") # type: ignore
-QUERY_EXPRESSION: int = util.symbol("QUERY_EXPRESSION") # type: ignore
+COLUMN: int = util.symbol("COLUMN")
+RELATIONSHIP: int = util.symbol("RELATIONSHIP")
+REGISTRY: int = util.symbol("REGISTRY")
+COLUMN_PROPERTY: int = util.symbol("COLUMN_PROPERTY")
+TYPEENGINE: int = util.symbol("TYPEENGNE")
+MAPPED: int = util.symbol("MAPPED")
+DECLARATIVE_BASE: int = util.symbol("DECLARATIVE_BASE")
+DECLARATIVE_META: int = util.symbol("DECLARATIVE_META")
+MAPPED_DECORATOR: int = util.symbol("MAPPED_DECORATOR")
+SYNONYM_PROPERTY: int = util.symbol("SYNONYM_PROPERTY")
+COMPOSITE_PROPERTY: int = util.symbol("COMPOSITE_PROPERTY")
+DECLARED_ATTR: int = util.symbol("DECLARED_ATTR")
+MAPPER_PROPERTY: int = util.symbol("MAPPER_PROPERTY")
+AS_DECLARATIVE: int = util.symbol("AS_DECLARATIVE")
+AS_DECLARATIVE_BASE: int = util.symbol("AS_DECLARATIVE_BASE")
+DECLARATIVE_MIXIN: int = util.symbol("DECLARATIVE_MIXIN")
+QUERY_EXPRESSION: int = util.symbol("QUERY_EXPRESSION")
# names that must succeed with mypy.api.named_type
NAMED_TYPE_BUILTINS_OBJECT = "builtins.object"
def format_type(typ_: Type, options: Options) -> str:
if mypy_14:
- return _mypy_format_type(typ_, options) # type: ignore
+ return _mypy_format_type(typ_, options)
else:
return _mypy_format_type(typ_) # type: ignore
def class_logger(cls: Type[_IT]) -> Type[_IT]:
logger = logging.getLogger(_qual_logger_name_for_cls(cls))
- cls._should_log_debug = lambda self: logger.isEnabledFor( # type: ignore[assignment] # noqa: E501
+ cls._should_log_debug = lambda self: logger.isEnabledFor( # type: ignore[method-assign] # noqa: E501
logging.DEBUG
)
- cls._should_log_info = lambda self: logger.isEnabledFor( # type: ignore[assignment] # noqa: E501
+ cls._should_log_info = lambda self: logger.isEnabledFor( # type: ignore[method-assign] # noqa: E501
logging.INFO
)
cls.logger = logger
def is_orm_option(
opt: ExecutableOption,
) -> TypeGuard[ORMOption]:
- return not opt._is_core # type: ignore
+ return not opt._is_core
def is_user_defined_option(
entity_namespace = self._entity_namespace
assert isinstance(entity_namespace, HasCacheKey)
- if self.key is _UNKNOWN_ATTR_KEY: # type: ignore[comparison-overlap]
+ if self.key is _UNKNOWN_ATTR_KEY:
annotations = {"entity_namespace": entity_namespace}
else:
annotations = {
def operate(
self, op: OperatorType, *other: Any, **kwargs: Any
) -> ColumnElement[Any]:
- return op(self.comparator, *other, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(self.comparator, *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
def reverse_operate(
self, op: OperatorType, other: Any, **kwargs: Any
) -> ColumnElement[Any]:
- return op(other, self.comparator, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(other, self.comparator, **kwargs) # type: ignore[no-any-return] # noqa: E501
def hasparent(
self, state: InstanceState[Any], optimistic: bool = False
# InstrumentedAttribute, while still keeping classlevel
# __doc__ correct
- @util.rw_hybridproperty # type: ignore
- def __doc__(self) -> Optional[str]: # type: ignore
+ @util.rw_hybridproperty
+ def __doc__(self) -> Optional[str]:
return self._doc
@__doc__.setter # type: ignore
- def __doc__(self, value: Optional[str]) -> None: # type: ignore
+ def __doc__(self, value: Optional[str]) -> None:
self._doc = value
@__doc__.classlevel # type: ignore
- def __doc__(cls) -> Optional[str]: # type: ignore
+ def __doc__(cls) -> Optional[str]:
return super().__doc__
def __set__(self, instance: object, value: Any) -> None:
and "None"
or iterable.__class__.__name__
)
- wanted = self._duck_typed_as.__name__ # type: ignore
+ wanted = self._duck_typed_as.__name__
raise TypeError(
"Incompatible collection type: %s is not %s-like"
% (given, wanted)
# which seems to help typing tools interpret the fn as a classmethod
# for situations where needed
if isinstance(fn, classmethod):
- fn = fn.__func__ # type: ignore
+ fn = fn.__func__
self.fget = fn
self._cascading = cascading
"Unmanaged access of declarative attribute %s from "
"non-mapped class %s" % (self.fget.__name__, cls.__name__)
)
- return self.fget(cls) # type: ignore
+ return self.fget(cls)
elif manager.is_mapped:
# the class is mapped, which means we're outside of the declarative
# scan setup, just run the function.
- return self.fget(cls) # type: ignore
+ return self.fget(cls)
# here, we are inside of the declarative scan. use the registry
# that is tracking the values of these attributes.
reg = declarative_scan.declared_attr_reg
if self in reg:
- return reg[self] # type: ignore
+ return reg[self]
else:
reg[self] = obj = self.fget(cls)
- return obj # type: ignore
+ return obj
class _declared_directive(_declared_attr_common, Generic[_T]):
reg = registry(
metadata=metadata, type_annotation_map=type_annotation_map
)
- cls.registry = reg # type: ignore
+ cls.registry = reg
- cls._sa_registry = reg # type: ignore
+ cls._sa_registry = reg
if "metadata" not in cls.__dict__:
- cls.metadata = cls.registry.metadata # type: ignore
+ cls.metadata = cls.registry.metadata
if getattr(cls, "__init__", object.__init__) is object.__init__:
cls.__init__ = cls.registry.constructor
current_transforms: _DataclassArguments
if hasattr(cls, "_sa_apply_dc_transforms"):
- current = cls._sa_apply_dc_transforms # type: ignore[attr-defined]
+ current = cls._sa_apply_dc_transforms
_ClassScanMapperConfig._assert_dc_arguments(current)
sql_type = sqltypes._type_map_get(pt) # type: ignore # noqa: E501
if sql_type is not None:
- sql_type_inst = sqltypes.to_instance(sql_type) # type: ignore
+ sql_type_inst = sqltypes.to_instance(sql_type)
# ... this additional step will reject most
# type -> supertype matches, such as if we had
setattr(cls, k, value)
continue
- our_stuff[k] = value # type: ignore
+ our_stuff[k] = value
def _extract_declared_columns(self) -> None:
our_stuff = self.properties
# mypy disallows plain property override of variable
@property # type: ignore
- def cls(self) -> Type[Any]: # type: ignore
+ def cls(self) -> Type[Any]:
return self._cls() # type: ignore
@cls.setter
@classmethod
def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
if hasattr(class_, "_sa_raise_deferred_config"):
- class_._sa_raise_deferred_config() # type: ignore
+ class_._sa_raise_deferred_config()
raise orm_exc.UnmappedClassError(
class_,
elif hasattr(self.composite_class, "__composite_values__"):
_composite_getters[
self.composite_class
- ] = lambda obj: obj.__composite_values__() # type: ignore
+ ] = lambda obj: obj.__composite_values__()
@util.preload_module("sqlalchemy.orm.properties")
@util.preload_module("sqlalchemy.orm.decl_base")
proxy_attr = self.parent.class_manager[self.key]
proxy_attr.impl.dispatch = proxy_attr.dispatch # type: ignore
- proxy_attr.impl.dispatch._active_history = self.active_history # type: ignore # noqa: E501
+ proxy_attr.impl.dispatch._active_history = self.active_history
# TODO: need a deserialize hook here
def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
return self._compare(operators.ne, other)
- def __lt__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
+ def __lt__(self, other: Any) -> ColumnElement[bool]:
return self._compare(operators.lt, other)
- def __gt__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
+ def __gt__(self, other: Any) -> ColumnElement[bool]:
return self._compare(operators.gt, other)
- def __le__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
+ def __le__(self, other: Any) -> ColumnElement[bool]:
return self._compare(operators.le, other)
- def __ge__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501
+ def __ge__(self, other: Any) -> ColumnElement[bool]:
return self._compare(operators.ge, other)
# what might be interesting would be if we create
]
if self._adapt_to_entity:
assert self.adapter is not None
- comparisons = [self.adapter(x) for x in comparisons] # type: ignore # noqa: E501
- return sql.and_(*comparisons) # type: ignore
+ comparisons = [self.adapter(x) for x in comparisons]
+ return sql.and_(*comparisons)
def __str__(self) -> str:
return str(self.parent.class_.__name__) + "." + self.key
def deferred_scalar_loader(self):
return self.expired_attribute_loader
- @deferred_scalar_loader.setter # type: ignore[no-redef]
+ @deferred_scalar_loader.setter
@util.deprecated(
"1.4",
message="The ClassManager.deferred_scalar_loader attribute is now "
init_method: Optional[Callable[..., None]] = None,
) -> None:
if mapper:
- self.mapper = mapper # type: ignore[assignment]
+ self.mapper = mapper #
if registry:
registry._add_manager(self)
if declarative_scan:
for key in list(self.originals):
self.uninstall_member(key)
- self.mapper = None # type: ignore
+ self.mapper = None
self.dispatch = None # type: ignore
self.new_init = None
self.info.clear()
# so that mypy sees that __new__ is present. currently
# it's bound to Any as there were other problems not having
# it that way but these can be revisited
- instance = self.class_.__new__(self.class_) # type: ignore
+ instance = self.class_.__new__(self.class_)
if state is None:
state = self._state_constructor(instance, self)
self._state_setter(instance, state)
- return instance # type: ignore[no-any-return]
+ return instance
def setup_instance(
self, instance: _O, state: Optional[InstanceState[_O]] = None
"""
- return self.operate( # type: ignore
- PropComparator.any_op, criterion, **kwargs
- )
+ return self.operate(PropComparator.any_op, criterion, **kwargs)
def has(
self,
"""
- return self.operate( # type: ignore
- PropComparator.has_op, criterion, **kwargs
- )
+ return self.operate(PropComparator.has_op, criterion, **kwargs)
class StrategizedProperty(MapperProperty[_T]):
# interim - polymorphic_on is further refined in
# _configure_polymorphic_setter
- self.polymorphic_on = ( # type: ignore
+ self.polymorphic_on = (
coercions.expect( # type: ignore
roles.ColumnArgumentOrKeyRole,
polymorphic_on,
self.with_polymorphic = None
if self.with_polymorphic and self.with_polymorphic[1] is not None:
- self.with_polymorphic = ( # type: ignore
+ self.with_polymorphic = (
self.with_polymorphic[0],
coercions.expect(
roles.StrictFromClauseRole,
manager = instrumentation.register_class(
self.class_,
mapper=self,
- expired_attribute_loader=util.partial( # type: ignore
+ expired_attribute_loader=util.partial(
loading.load_scalar_attributes, self
),
# finalize flag means instrument the __init__ method
if isinstance(c, str)
else c
for c in (
- coercions.expect( # type: ignore
+ coercions.expect(
roles.DDLConstraintColumnRole,
coerce_pk,
argname="primary_key",
self._wildcard_path_loader_key = (
"loader",
- parent.natural_path + self.prop._wildcard_token, # type: ignore
+ parent.natural_path + self.prop._wildcard_token,
)
self._default_path_loader_key = self.prop._default_path_loader_key
self._loader_key = ("loader", self.natural_path)
try:
return ce.info # type: ignore
except AttributeError:
- return self.prop.info # type: ignore
+ return self.prop.info
def _memoized_attr_expressions(self) -> Sequence[NamedColumn[Any]]:
"""The full sequence of columns referenced by this
def operate(
self, op: OperatorType, *other: Any, **kwargs: Any
) -> ColumnElement[Any]:
- return op(self.__clause_element__(), *other, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(self.__clause_element__(), *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
def reverse_operate(
self, op: OperatorType, other: Any, **kwargs: Any
) -> ColumnElement[Any]:
col = self.__clause_element__()
- return op(col._bind_param(op, other), col, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(col._bind_param(op, other), col, **kwargs) # type: ignore[no-any-return] # noqa: E501
def __str__(self) -> str:
if not self.parent or not self.key:
def operate(
self, op: OperatorType, *other: Any, **kwargs: Any
) -> ColumnElement[Any]:
- return op(self.__clause_element__(), *other, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(self.__clause_element__(), *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
def reverse_operate(
self, op: OperatorType, other: Any, **kwargs: Any
) -> ColumnElement[Any]:
col = self.__clause_element__()
- return op(col._bind_param(op, other), col, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(col._bind_param(op, other), col, **kwargs) # type: ignore[no-any-return] # noqa: E501
def found_in_pep593_annotated(self) -> Any:
# return a blank mapped_column(). This mapped_column()'s
self._set_entities(entities)
def _set_propagate_attrs(self, values: Mapping[str, Any]) -> Self:
- self._propagate_attrs = util.immutabledict(values) # type: ignore
+ self._propagate_attrs = util.immutabledict(values)
return self
def _set_entities(
return self
def _clone(self, **kw: Any) -> Self:
- return self._generate() # type: ignore
+ return self._generate()
def _get_select_statement_only(self) -> Select[_T]:
if self._statement is not None:
q._set_entities(columns)
if not q.load_options._yield_per:
q.load_options += {"_yield_per": 10}
- return iter(q) # type: ignore
+ return iter(q)
@util.deprecated(
"1.4",
argument = de_optionalize_union_types(argument)
if hasattr(argument, "__origin__"):
- arg_origin = argument.__origin__ # type: ignore
+ arg_origin = argument.__origin__
if isinstance(arg_origin, type) and issubclass(
arg_origin, abc.Collection
):
if argument.__args__: # type: ignore
if isinstance(arg_origin, type) and issubclass(
- arg_origin, typing.Mapping # type: ignore
+ arg_origin, typing.Mapping
):
type_arg = argument.__args__[-1] # type: ignore
else:
f"Generic alias {argument} requires an argument"
)
elif hasattr(argument, "__forward_arg__"):
- argument = argument.__forward_arg__ # type: ignore
+ argument = argument.__forward_arg__
argument = resolve_name_to_real_class_name(
argument, originating_module
% (self.key, type(resolved_argument))
)
- self.entity = entity # type: ignore
+ self.entity = entity
self.target = self.entity.persist_selectable
def _setup_join_conditions(self) -> None:
self.class_ = state_dict["class_"]
self.committed_state = state_dict.get("committed_state", {})
- self._pending_mutations = state_dict.get("_pending_mutations", {}) # type: ignore # noqa E501
- self.parents = state_dict.get("parents", {}) # type: ignore
+ self._pending_mutations = state_dict.get("_pending_mutations", {})
+ self.parents = state_dict.get("parents", {})
self.modified = state_dict.get("modified", False)
self.expired = state_dict.get("expired", False)
if "info" in state_dict:
# IMO mypy should see this one also as returning the same type
# we put into it, but it's not
return (
- self._adapter.traverse(expr) # type: ignore
+ self._adapter.traverse(expr)
._annotate(d)
._set_propagate_attrs(
{"compile_state_plugin": "orm", "plugin_subject": self}
self.deferred_where_criteria = True
self.where_criteria = lambdas.DeferredLambdaElement(
- where_criteria, # type: ignore
+ where_criteria,
roles.WhereHavingRole,
lambda_args=(_WrapUserEntity(wrap_entity),),
opts=lambdas.LambdaOptions(
res = iterable_query.slice(start, stop)
if step is not None:
- return list(res)[None : None : item.step] # type: ignore
+ return list(res)[None : None : item.step]
else:
- return list(res) # type: ignore
+ return list(res)
else:
if item == -1:
_no_negative_indexes()
else:
return annotated, None
- if len(annotated.__args__) != 1: # type: ignore
+ if len(annotated.__args__) != 1:
raise sa_exc.ArgumentError(
"Expected sub-type for Mapped[] annotation"
)
- return annotated.__args__[0], annotated.__origin__ # type: ignore
+ return annotated.__args__[0], annotated.__origin__
# mypy seems to get super confused assigning functions to
# attributes
- self._invoke_creator = self._should_wrap_creator(creator) # type: ignore # noqa: E501
+ self._invoke_creator = self._should_wrap_creator(creator)
@_creator.deleter
def _creator(self) -> None:
# needed for mock testing
del self._creator_arg
- del self._invoke_creator # type: ignore[misc]
+ del self._invoke_creator
def _should_wrap_creator(
self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
# time and invalidation for the logic below to work reliably.
if self.dbapi_connection is None:
- self.info.clear() # type: ignore # our info is always present
+ self.info.clear()
self.__connect()
elif (
self.__pool._recycle > -1
if recycle:
self.__close(terminate=True)
- self.info.clear() # type: ignore # our info is always present
+ self.info.clear()
self.__connect()
return None
@classmethod
- def _listen( # type: ignore[override] # would rather keep **kw
+ def _listen(
cls,
event_key: event._EventKey[Pool],
**kw: Any,
def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
try:
- del self._fairy.current # type: ignore
+ del self._fairy.current
except AttributeError:
pass
.. versionadded:: 2.0.20
"""
- return val # type: ignore
+ return val
@overload
def _annotate(self, values: _AnnotationDict) -> Self:
_values = self._annotations.union(values)
- new = self._with_annotations(_values) # type: ignore
+ new = self._with_annotations(_values)
return new
def _with_annotations(self, values: _AnnotationDict) -> Self:
"""
- @util.decorator # type: ignore
+ @util.decorator
def _generative(
fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
) -> _SelfGenerativeType:
for name in names
]
- @util.decorator # type: ignore
+ @util.decorator
def check(fn, *args, **kw):
# make pylance happy by not including "self" in the argument
# list
raise exc.InvalidRequestError(msg)
return fn(self, *args, **kw)
- return check # type: ignore
+ return check
def _clone(element, **kw):
from_linter.froms[cte._de_clone()] = cte_name
if not is_new_cte and embedded_in_current_named_cte:
- return self.preparer.format_alias(cte, cte_name) # type: ignore[no-any-return] # noqa: E501
+ return self.preparer.format_alias(cte, cte_name)
if cte_pre_alias_name:
text = self.preparer.format_alias(cte, cte_pre_alias_name)
key: Union[ColumnClause[Any], str]
) -> Union[str, Tuple[str, str]]:
str_key = c_key_role(key)
- if hasattr(key, "table") and key.table in _et: # type: ignore
+ if hasattr(key, "table") and key.table in _et:
return (key.table.name, str_key) # type: ignore
else:
- return str_key # type: ignore
+ return str_key
def _getattr_col_key(
col: ColumnClause[Any],
return col.key
else:
- _column_as_key = functools.partial( # type: ignore
+ _column_as_key = functools.partial(
coercions.expect_as_key, roles.DMLColumnRole
)
_getattr_col_key = _col_bind_name = operator.attrgetter("key") # type: ignore # noqa: E501
__visit_name__ = "create_schema"
- stringify_dialect = "default" # type: ignore
+ stringify_dialect = "default"
def __init__(
self,
__visit_name__ = "drop_schema"
- stringify_dialect = "default" # type: ignore
+ stringify_dialect = "default"
def __init__(
self,
*other: Any,
**kwargs: Any,
) -> ColumnElement[Any]:
- return op(self.comparator, *other, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(self.comparator, *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
def reverse_operate(
self, op: operators.OperatorType, other: Any, **kwargs: Any
) -> ColumnElement[Any]:
- return op(other, self.comparator, **kwargs) # type: ignore[return-value,no-any-return] # noqa: E501
+ return op(other, self.comparator, **kwargs) # type: ignore[no-any-return] # noqa: E501
def _bind_param(
self,
}, *args)'.""",
version="1.4",
)
- return cls._construct_raw(operator) # type: ignore[no-any-return]
+ return cls._construct_raw(operator)
lcc, convert_clauses = cls._process_clauses_for_boolean(
operator,
assert lcc
# just one element. return it as a single boolean element,
# not a list and discard the operator.
- return convert_clauses[0] # type: ignore[no-any-return] # noqa: E501
+ return convert_clauses[0]
@classmethod
def _construct_for_whereclause(
opts: Union[Type[LambdaOptions], LambdaOptions],
):
self.opts = opts
- self.fn = fn # type: ignore[assignment]
+ self.fn = fn
self.parent_lambda = parent_lambda
self.tracker_key = parent_lambda.tracker_key + (fn.__code__,)
)
def against(other: Any) -> Operators:
- return operator(self, other) # type: ignore
+ return operator(self, other)
return against
return self.operate(le, other)
# TODO: not sure why we have this
- __hash__ = Operators.__hash__ # type: ignore
+ __hash__ = Operators.__hash__
def __eq__(self, other: Any) -> ColumnOperators: # type: ignore[override]
"""Implement the ``==`` operator.
elif schema is None:
actual_schema = metadata.schema
else:
- actual_schema = schema # type: ignore
+ actual_schema = schema
key = _get_table_key(name, actual_schema)
if key in metadata.tables:
util.warn(
# Constraint objects plus non-constraint-bound ForeignKey objects
args: List[SchemaItem] = [
- c._copy(**kw)
- for c in self.constraints
- if not c._type_bound # type: ignore
- ] + [
- c._copy(**kw) # type: ignore
- for c in self.foreign_keys
- if not c.constraint
- ]
+ c._copy(**kw) for c in self.constraints if not c._type_bound
+ ] + [c._copy(**kw) for c in self.foreign_keys if not c.constraint]
# ticket #5276
column_kwargs = {}
if for_update == self.for_update:
return self
else:
- return self._clone(for_update) # type: ignore
+ return self._clone(for_update)
def _copy(self) -> FetchedValue:
return FetchedValue(self.for_update)
"and will be removed in a future release.",
)
def copy(self, **kw: Any) -> Self:
- return self._copy(**kw) # type: ignore
+ return self._copy(**kw)
def _copy(self, **kw: Any) -> Self:
raise NotImplementedError()
DEFAULT_NAMING_CONVENTION: _NamingSchemaParameter = util.immutabledict(
- {"ix": "ix_%(column_0_label)s"} # type: ignore[arg-type]
+ {"ix": "ix_%(column_0_label)s"}
)
object, returning a copy of this :class:`_expression.FromClause`.
"""
- return util.preloaded.sql_util.ClauseAdapter(alias).traverse( # type: ignore # noqa: E501
- self
- )
+ return util.preloaded.sql_util.ClauseAdapter(alias).traverse(self)
def corresponding_column(
self, column: KeyedColumnElement[Any], require_embedded: bool = False
continue
for fk in sorted(
b.foreign_keys,
- key=lambda fk: fk.parent._creation_order, # type: ignore
+ key=lambda fk: fk.parent._creation_order,
):
if (
consider_as_foreign_keys is not None
if left is not b:
for fk in sorted(
left.foreign_keys,
- key=lambda fk: fk.parent._creation_order, # type: ignore
+ key=lambda fk: fk.parent._creation_order,
):
if (
consider_as_foreign_keys is not None
Dict[str, ColumnElement[Any]],
]:
with_cols: Dict[str, ColumnElement[Any]] = {
- c._tq_label or c.key: c # type: ignore
+ c._tq_label or c.key: c
for c in self.statement._all_selected_columns
if c._allow_label_resolve
}
c.__dict__ = {k: v for k, v in self.__dict__.items()}
c._is_clone_of = self.__dict__.get("_is_clone_of", self)
- return c # type: ignore
+ return c
@classmethod
def _generate_for_statement(cls, select_stmt: Select[Any]) -> None:
if hasattr(cls, "_generate_cache_attrs") and hasattr(
cls, "_traverse_internals"
):
- cls._generate_cache_attrs() # type: ignore
+ cls._generate_cache_attrs()
_copy_internals.generate_dispatch(
- cls, # type: ignore
- cls._traverse_internals, # type: ignore
+ cls,
+ cls._traverse_internals,
"_generated_copy_internals_traversal",
)
_get_children.generate_dispatch(
- cls, # type: ignore
- cls._traverse_internals, # type: ignore
+ cls,
+ cls._traverse_internals,
"_generated_get_children_traversal",
)
op_fn, addtl_kw = default_comparator.operator_lookup[op.__name__]
if kwargs:
addtl_kw = addtl_kw.union(kwargs)
- return op_fn(self.expr, op, *other, **addtl_kw) # type: ignore
+ return op_fn(self.expr, op, *other, **addtl_kw)
@util.preload_module("sqlalchemy.sql.default_comparator")
def reverse_operate(
op_fn, addtl_kw = default_comparator.operator_lookup[op.__name__]
if kwargs:
addtl_kw = addtl_kw.union(kwargs)
- return op_fn(self.expr, op, other, reverse=True, **addtl_kw) # type: ignore # noqa: E501
+ return op_fn(self.expr, op, other, reverse=True, **addtl_kw)
def _adapt_expression(
self,
best_uppercase = None
if not isinstance(self, TypeEngine):
- return self.__class__ # type: ignore # mypy bug?
+ return self.__class__
for t in self.__class__.__mro__:
if (
return NULLTYPE
if callable(typeobj):
- return typeobj(*arg, **kw) # type: ignore # for pyright
+ return typeobj(*arg, **kw)
else:
return typeobj
if clause is None:
return None
if hasattr(clause, "_limit_offset_value"):
- value = clause._limit_offset_value # type: ignore
+ value = clause._limit_offset_value
return util.asint(value)
else:
return clause
offset_clause = 0
if start != 0:
- offset_clause = offset_clause + start # type: ignore
+ offset_clause = offset_clause + start
if offset_clause == 0:
offset_clause = None
else:
- offset_clause = _offset_or_limit_clause(
- offset_clause # type: ignore
- )
+ offset_clause = _offset_or_limit_clause(offset_clause)
- return limit_clause, offset_clause # type: ignore
+ return limit_clause, offset_clause
return dir(super()) + [str(k) for k in self._data.keys()]
def __add__(self, other: Properties[_F]) -> List[Union[_T, _F]]:
- return list(self) + list(other) # type: ignore
+ return list(self) + list(other)
def __setitem__(self, key: str, obj: _T) -> None:
self._data[key] = obj
self.data = data
self._unique = {}
if via:
- self._data_appender = getattr(data, via) # type: ignore[assignment] # noqa: E501
+ self._data_appender = getattr(data, via)
elif hasattr(data, "append"):
- self._data_appender = cast("List[_T]", data).append # type: ignore[assignment] # noqa: E501
+ self._data_appender = cast("List[_T]", data).append
elif hasattr(data, "add"):
- self._data_appender = cast("Set[_T]", data).add # type: ignore[assignment] # noqa: E501
+ self._data_appender = cast("Set[_T]", data).add
def append(self, item: _T) -> None:
id_ = id(item)
if id_ not in self._unique:
- self._data_appender(item) # type: ignore[call-arg]
+ self._data_appender(item)
self._unique[id_] = True
def __iter__(self) -> Iterator[_T]:
return self.registry.value # type: ignore[no-any-return]
except AttributeError:
val = self.registry.value = self.createfunc()
- return val # type: ignore[no-any-return]
+ return val
def has(self) -> bool:
return hasattr(self.registry, "value")
# Issue for context: https://github.com/python-greenlet/greenlet/issues/173
-class _AsyncIoGreenlet(greenlet): # type: ignore
+class _AsyncIoGreenlet(greenlet):
dead: bool
def __init__(self, fn: Callable[..., Any], driver: greenlet):
"loop is already running; can't call await_fallback() here. "
"Was IO attempted in an unexpected place?"
)
- return loop.run_until_complete(awaitable) # type: ignore[no-any-return] # noqa: E501
+ return loop.run_until_complete(awaitable)
return current.driver.switch(awaitable) # type: ignore[no-any-return]
super().add(e)
def __ior__(self, other: AbstractSet[_S]) -> OrderedSet[Union[_T, _S]]:
- self.update(other) # type: ignore
- return self # type: ignore
+ self.update(other)
+ return self
def union(self, *other: Iterable[_S]) -> OrderedSet[Union[_T, _S]]:
- result: OrderedSet[Union[_T, _S]] = self.copy() # type: ignore
+ result: OrderedSet[Union[_T, _S]] = self.copy()
result.update(*other)
return result
# latest mypy has opinions here, not sure if they implemented
# Concatenate or something
- @decorator # type: ignore
+ @decorator
def warned(fn: _F, *args: Any, **kwargs: Any) -> _F:
for m in check_defaults:
if (defaults[m] is None and kwargs[m] is not None) or (
for param, (version, message) in specs.items()
},
)
- decorated = warned(fn) # type: ignore
+ decorated = warned(fn)
decorated.__doc__ = doc
- return decorated # type: ignore[no-any-return]
+ return decorated
return decorate
clsdict["__doc__"] = doc
clsdict.pop("__dict__", None)
clsdict.pop("__weakref__", None)
- cls = type(cls.__name__, cls.__bases__, clsdict) # type: ignore
+ cls = type(cls.__name__, cls.__bases__, clsdict)
if constructor is not None:
constructor_fn = clsdict[constructor]
else:
doc_only = ""
- @decorator # type: ignore
+ @decorator
def warned(fn: _F, *args: Any, **kwargs: Any) -> _F:
skip_warning = not enable_warnings or kwargs.pop(
"_sa_skip_warning", False
doc = inject_docstring_text(doc, docstring_header, 1)
- decorated = warned(func) # type: ignore
+ decorated = warned(func)
decorated.__doc__ = doc
decorated._sa_warn = lambda: _warn_with_version( # type: ignore
message, version, wtype, stacklevel=3
)
- return decorated # type: ignore[no-any-return]
+ return decorated
fn.__init__, no_self=no_self, _is_init=True
)
elif hasattr(fn, "__func__"):
- return compat.inspect_getfullargspec(fn.__func__) # type: ignore[attr-defined] # noqa: E501
+ return compat.inspect_getfullargspec(fn.__func__)
elif hasattr(fn, "__call__"):
- if inspect.ismethod(fn.__call__): # type: ignore [operator]
- return get_callable_argspec(
- fn.__call__, no_self=no_self # type: ignore [operator]
- )
+ if inspect.ismethod(fn.__call__):
+ return get_callable_argspec(fn.__call__, no_self=no_self)
else:
raise TypeError("Can't inspect callable: %s" % fn)
else:
__name__: str
def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
- self.fget = fget # type: ignore[assignment]
+ self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
__name__: str
def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
- # https://github.com/python/mypy/issues/708
- self.fget = fget # type: ignore
+ self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
- @overload # type: ignore[override]
+ @overload
def __get__(self: _MA, obj: None, cls: Any) -> _MA:
...
if isinstance(argtype, tuple):
raise exc.ArgumentError(
"Argument '%s' is expected to be one of type %s, got '%s'"
- % (name, " or ".join("'%s'" % a for a in argtype), type(arg)) # type: ignore # noqa: E501
+ % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
)
else:
raise exc.ArgumentError(
self.__doc__ = fget.__doc__
def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
- return self.fget(cls) # type: ignore
+ return self.fget(cls)
class hybridproperty(Generic[_T]):
show_error_codes = true
incremental = true
-
[[tool.mypy.overrides]]
module = [
"sqlalchemy.*"
]
-warn_unused_ignores = false
+warn_unused_ignores = true
strict = true
from sqlalchemy import create_engine
from sqlalchemy.orm import Mapped
+from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import registry
from sqlalchemy.orm import Session
from sqlalchemy.sql.functions import now
-from sqlalchemy.testing.schema import mapped_column
mapper_registry: registry = registry()
e = create_engine("sqlite:///database.db", echo=True)