# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/python/black
- rev: 21.5b1
+ rev: 21.12b0
hooks:
- id: black
- repo: https://github.com/sqlalchemyorg/zimports
- rev: v0.4.0
+ rev: v0.4.1
hooks:
- id: zimports
args:
- --keep-unused-type-checking
- repo: https://github.com/pycqa/flake8
- rev: 3.9.2
+ rev: 4.0.1
hooks:
- id: flake8
additional_dependencies:
- flake8-import-order
- flake8-builtins
- - flake8-docstrings>=1.3.1
+ - flake8-docstrings>=1.6.0
- flake8-rst-docstrings
- # flake8-rst-docstrings depdendency, leaving it here
+ # flake8-rst-docstrings dependency, leaving it here
# in case it requires a version pin
- pydocstyle
- pygments
master_doc = "contents"
# General information about the project.
-project = u"SQLAlchemy"
-copyright = u"2007-2021, the SQLAlchemy authors and contributors" # noqa
+project = "SQLAlchemy"
+copyright = "2007-2021, the SQLAlchemy authors and contributors" # noqa
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
(
"index",
"sqlalchemy",
- u"SQLAlchemy Documentation",
- [u"SQLAlchemy authors"],
+ "SQLAlchemy Documentation",
+ ["SQLAlchemy authors"],
1,
)
]
# -- Options for Epub output -------------------------------------------------
# Bibliographic Dublin Core info.
-epub_title = u"SQLAlchemy"
-epub_author = u"SQLAlchemy authors"
-epub_publisher = u"SQLAlchemy authors"
-epub_copyright = u"2007-2015, SQLAlchemy authors"
+epub_title = "SQLAlchemy"
+epub_author = "SQLAlchemy authors"
+epub_publisher = "SQLAlchemy authors"
+epub_copyright = "2007-2015, SQLAlchemy authors"
# The language of the text. It defaults to the language option
# or en if the language is not set.
local_mapper.local_table.name + "_history",
local_mapper.local_table.metadata,
*cols,
- schema=local_mapper.local_table.schema
+ schema=local_mapper.local_table.schema,
)
else:
# single table inheritance. take any additional columns that may have
expression.BinaryExpression(
binary.right, binary.left, binary.operator
),
- **kwargs
+ **kwargs,
)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
dbname,
owner,
schema,
- **kw
+ **kw,
)
return update_wrapper(wrap, fn)
dbname,
owner,
schema,
- **kw
+ **kw,
)
return update_wrapper(wrap, fn)
json_serializer=None,
json_deserializer=None,
legacy_schema_aliasing=None,
- **opts
+ **opts,
):
self.query_timeout = int(query_timeout or 0)
self.schema_name = schema_name
json_serializer=None,
json_deserializer=None,
is_mariadb=None,
- **kwargs
+ **kwargs,
):
kwargs.pop("use_ansiquotes", None) # legacy
default.DefaultDialect.__init__(self, **kwargs)
binary=False,
unicode=False,
national=False,
- **kw
+ **kw,
):
self.charset = charset
use_binds_for_limits=None,
use_nchar_for_unicode=False,
exclude_tablespaces=("SYSTEM", "SYSAUX"),
- **kwargs
+ **kwargs,
):
default.DefaultDialect.__init__(self, **kwargs)
self._use_nchar_for_unicode = use_nchar_for_unicode
schema=None,
resolve_synonyms=False,
dblink="",
- **kw
+ **kw,
):
if resolve_synonyms:
schema=None,
resolve_synonyms=False,
dblink="",
- **kw
+ **kw,
):
info_cache = kw.get("info_cache")
schema=None,
resolve_synonyms=False,
dblink="",
- **kw
+ **kw,
):
info_cache = kw.get("info_cache")
schema=None,
resolve_synonyms=False,
dblink="",
- **kw
+ **kw,
):
info_cache = kw.get("info_cache")
(view_name, schema, dblink, synonym) = self._prepare_reflection_args(
arraysize=50,
encoding_errors=None,
threaded=None,
- **kwargs
+ **kwargs,
):
OracleDialect.__init__(self, **kwargs)
str,
size,
cursor.arraysize,
- **dialect._cursor_var_unicode_kwargs
+ **dialect._cursor_var_unicode_kwargs,
)
elif dialect.auto_convert_lobs and default_type in (
cx_Oracle.LONG_STRING,
size,
cursor.arraysize,
- **dialect._cursor_var_unicode_kwargs
+ **dialect._cursor_var_unicode_kwargs,
)
elif dialect.auto_convert_lobs and default_type in (
client_encoding=None,
use_native_hstore=True,
use_native_uuid=True,
- **kwargs
+ **kwargs,
):
PGDialect.__init__(self, **kwargs)
if not use_native_hstore:
table_name,
schema=None,
postgresql_ignore_search_path=False,
- **kw
+ **kw,
):
preparer = self.identifier_preparer
table_oid = self.get_table_oid(
*columns,
name=kw.get("name"),
deferrable=kw.get("deferrable"),
- initially=kw.get("initially")
+ initially=kw.get("initially"),
)
self.using = kw.get("using", "gist")
where = kw.get("where")
deferrable=self.deferrable,
initially=self.initially,
where=self.where,
- using=self.using
+ using=self.using,
)
c.dispatch._update(self.dispatch)
return c
executemany_mode="values_only",
executemany_batch_page_size=100,
executemany_values_page_size=1000,
- **kwargs
+ **kwargs,
):
_PGDialect_common_psycopg.__init__(self, **kwargs)
parameters,
template=executemany_values,
fetch=bool(context.compiled.returning),
- **kwargs
+ **kwargs,
)
elif self.executemany_mode & EXECUTEMANY_BATCH:
json_deserializer=None,
_json_serializer=None,
_json_deserializer=None,
- **kwargs
+ **kwargs,
):
default.DefaultDialect.__init__(self, **kwargs)
table_name,
schema=schema,
include_auto_indexes=True,
- **kw
+ **kw,
):
if not idx["name"].startswith("sqlite_autoindex"):
continue
parameters,
execution_options,
*args,
- **kw
+ **kw,
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
# the direct reference to the "NO_LINTING" object
compiler_linting=int(compiler.NO_LINTING),
server_side_cursors=False,
- **kwargs
+ **kwargs,
):
if server_side_cursors:
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw
+ **kw,
) -> List[ReflectedColumn]:
"""Return information about columns in ``table_name``.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> ReflectedPrimaryKeyConstraint:
"""Return information about the primary key constraint on
table_name`.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> List[ReflectedForeignKeyConstraint]:
"""Return information about foreign_keys in ``table_name``.
connection: "Connection",
view_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> str:
"""Return view definition.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> List[ReflectedIndex]:
"""Return information about indexes in ``table_name``.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> List[ReflectedUniqueConstraint]:
r"""Return information about unique constraints in ``table_name``.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> List[ReflectedCheckConstraint]:
r"""Return information about check constraints in ``table_name``.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> Dict[str, Any]:
r"""Return the "options" for the table identified by ``table_name``
as a dictionary.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> ReflectedTableComment:
r"""Return the "comment" for the table identified by ``table_name``.
connection: "Connection",
table_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> bool:
"""For internal dialect use, check the existence of a particular table
or view in the database.
connection: "Connection",
sequence_name: str,
schema: Optional[str] = None,
- **kw: Any
+ **kw: Any,
) -> bool:
"""Check the existence of a particular sequence in the database.
# return Decimal('5'). These are equivalent of course.
return DecimalResultProcessor(target_class, "%%.%df" % scale).process
-
except ImportError:
from ._py_processors import int_to_boolean # noqa
from ._py_processors import str_to_date # noqa
schema=referred_schema,
autoload_with=self.bind,
_extend_on=_extend_on,
- **reflection_options
+ **reflection_options,
)
for column in referred_columns:
refspec.append(
autoload_with=self.bind,
schema=sa_schema.BLANK_SCHEMA,
_extend_on=_extend_on,
- **reflection_options
+ **reflection_options,
)
for column in referred_columns:
refspec.append(".".join([referred_table, column]))
refspec,
conname,
link_to_name=True,
- **options
+ **options,
)
)
name,
*idx_cols,
_table=table,
- **dict(list(dialect_options.items()) + [("unique", unique)])
+ **dict(list(dialect_options.items()) + [("unique", unique)]),
)
def _reflect_unique_constraints(
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
- **kw
+ **kw,
):
"""Execute a statement and return a buffered
:class:`_engine.Result` object.
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
- **kw
+ **kw,
)
async def scalar(
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
- **kw
+ **kw,
):
"""Execute a statement and return a scalar result.
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
- **kw
+ **kw,
)
return result.scalar()
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
- **kw
+ **kw,
):
"""Execute a statement and return scalar results.
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
- **kw
+ **kw,
)
return result.scalars()
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
- **kw
+ **kw,
):
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object."""
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
- **kw
+ **kw,
)
return _result.AsyncResult(result)
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
- **kw
+ **kw,
):
"""Execute a statement and return a stream of scalar results.
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
- **kw
+ **kw,
)
return result.scalars()
referred_cls,
local_cls,
collection_class=collection_class,
- **o2m_kws
+ **o2m_kws,
)
else:
backref_obj = None
foreign_keys=[fk.parent for fk in constraint.elements],
back_populates=relationship_name,
collection_class=collection_class,
- **o2m_kws
+ **o2m_kws,
)
if rel is not None:
referred_cfg.properties[backref_name] = rel
execute_chooser=None,
shards=None,
query_cls=ShardedQuery,
- **kwargs
+ **kwargs,
):
"""Construct a ShardedSession.
primary_key_identity,
identity_token=None,
lazy_loaded_from=None,
- **kw
+ **kw,
):
"""override the default :meth:`.Session._identity_lookup` method so
that we search for a given non-token primary key identity across all
mapper,
primary_key_identity,
identity_token=identity_token,
- **kw
+ **kw,
)
else:
q = self.query(mapper)
primary_key_identity,
identity_token=shard_id,
lazy_loaded_from=lazy_loaded_from,
- **kw
+ **kw,
)
if obj is not None:
return obj
callexpr: CallExpr,
name: str,
*,
- expr_types: Tuple[TypingType[_TArgType], ...]
+ expr_types: Tuple[TypingType[_TArgType], ...],
) -> Optional[_TArgType]:
...
callexpr: CallExpr,
name: str,
*,
- expr_types: Optional[Tuple[TypingType[Any], ...]] = None
+ expr_types: Optional[Tuple[TypingType[Any], ...]] = None,
) -> Optional[Any]:
try:
arg_idx = callexpr.arg_names.index(name)
load_on_unexpire=True,
send_modified_events=True,
accepts_scalar_loader=None,
- **kwargs
+ **kwargs,
):
r"""Construct an AttributeImpl.
trackparent=False,
copy_function=None,
compare_function=None,
- **kwargs
+ **kwargs,
):
super(CollectionAttributeImpl, self).__init__(
class_,
dispatch,
trackparent=trackparent,
compare_function=compare_function,
- **kwargs
+ **kwargs,
)
if copy_function is None:
useobject=False,
impl_class=None,
backref=None,
- **kw
+ **kw,
):
manager = manager_of_class(class_)
statement_hints=self.select_statement._statement_hints,
correlate=self.correlate,
correlate_except=self.correlate_except,
- **self._select_args
+ **self._select_args,
)
inner = inner.alias()
statement,
*self.compound_eager_adapter.copy_and_process(
unwrapped_order_by
- )
+ ),
)
statement.order_by.non_generative(statement, *self.eager_order_by)
statement_hints=self.select_statement._statement_hints,
correlate=self.correlate,
correlate_except=self.correlate_except,
- **self._select_args
+ **self._select_args,
)
if self.eager_order_by:
tablename,
self._metadata_for_cls(manager),
*(tuple(declared_columns) + tuple(args)),
- **table_kw
+ **table_kw,
),
)
else:
target_mapper,
order_by,
query_class=None,
- **kw
+ **kw,
):
super(DynamicAttributeImpl, self).__init__(
class_, key, typecallable, dispatch, **kw
raw=False,
propagate=False,
restore_load_context=False,
- **kw
+ **kw,
):
target, fn = (event_key.dispatch_target, event_key._listen_fn)
with_polymorphic=None,
only_load_props=None,
polymorphic_discriminator=None,
- **kw
+ **kw,
):
if with_polymorphic:
column_collection=column_collection,
memoized_populators=quick_populators,
check_for_adapt=check_for_adapt,
- **kw
+ **kw,
)
if (
deferred=self.deferred,
group=self.group,
active_history=self.active_history,
- *self.columns
+ *self.columns,
)
def _getcommitted(
sj,
foreign_keys=foreign_keys,
back_populates=self.key,
- **kwargs
+ **kwargs,
)
mapper._configure_property(backref_key, relationship)
bind_arguments=None,
_parent_execute_state=None,
_add_event=None,
- **kw
+ **kw,
):
r"""Execute a SQL expression construct.
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
- **kw
+ **kw,
):
"""Execute a statement and return a scalar result.
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
- **kw
+ **kw,
).scalar()
def scalars(
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
- **kw
+ **kw,
):
"""Execute a statement and return the results as scalars.
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
- **kw
+ **kw,
).scalars()
def close(self):
autoflush=True,
expire_on_commit=True,
info=None,
- **kw
+ **kw,
):
r"""Construct a new :class:`.sessionmaker`.
proxy_property=None,
active_history=False,
impl_class=None,
- **kw
+ **kw,
):
listen_hooks = []
impl_class=impl_class,
send_modified_events=not useobject or not prop.viewonly,
doc=prop.doc,
- **kw
+ **kw,
)
for hook in listen_hooks:
loadopt,
adapter,
column_collection=None,
- **kwargs
+ **kwargs,
):
for c in self.columns:
if adapter:
column_collection,
memoized_populators,
check_for_adapt=False,
- **kwargs
+ **kwargs,
):
for c in self.columns:
if adapter:
adapter,
column_collection,
memoized_populators,
- **kwargs
+ **kwargs,
):
columns = None
if loadopt and "expression" in loadopt.local_opts:
column_collection,
memoized_populators,
only_load_props=None,
- **kw
+ **kw,
):
if (
adapter,
column_collection,
memoized_populators,
- **kw
+ **kw,
)
elif self.is_class_level:
memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
orig_query, compile_state=orig_compile_state
)
if ent["entity"] is not None
- }
+ },
)
# select from the identity columns of the outer (specifically, these
column_collection=None,
parentmapper=None,
chained_from_outerjoin=False,
- **kwargs
+ **kwargs,
):
"""Add a left outer join to the statement that's being constructed."""
expression.ClauseList(
_literal_as_text_role=roles.ColumnsClauseRole,
group=False,
- *[e._annotations.get("bundle", e) for e in self.exprs]
+ *[e._annotations.get("bundle", e) for e in self.exprs],
)
._annotate(annotations)
._set_propagate_attrs(
max_overflow=10,
timeout=30.0,
use_lifo=False,
- **kw
+ **kw,
):
r"""
Construct a QueuePool.
apply_propagate_attrs=None,
argname=None,
post_inspect=False,
- **kw
+ **kw,
):
if (
role.allows_lambda
resolved,
argname=argname,
original_element=original_element,
- **kw
+ **kw,
)
return resolved
else:
resolved=resolved,
advice=advice,
code=code,
- **kw
+ **kw,
)
argname=None,
explicit_subquery=False,
allow_select=True,
- **kw
+ **kw,
):
if resolved._is_select_statement:
if explicit_subquery:
resolved,
argname=None,
allow_select=False,
- **kw
+ **kw,
):
if resolved._is_select_statement and allow_select:
util.warn_deprecated(
column_keys=None,
for_executemany=False,
linting=NO_LINTING,
- **kwargs
+ **kwargs,
):
"""Construct a new :class:`.SQLCompiler` object.
return self.process(
element.element,
within_columns_clause=within_columns_clause,
- **kwargs
+ **kwargs,
)
def visit_textual_label_reference(
within_columns_clause=False,
render_label_as_label=None,
result_map_targets=(),
- **kw
+ **kw,
):
# only render labels within the columns clause
# or ORDER BY clause of a select. dialect-specific compilers
self,
within_columns_clause=True,
within_label_clause=True,
- **kw
+ **kw,
)
+ OPERATORS[operators.as_]
+ self.preparer.format_label(label, labelname)
add_to_result_map=None,
include_table=True,
result_map_targets=(),
- **kwargs
+ **kwargs,
):
name = orig_name = column.name
if name is None:
eager_grouping=False,
from_linter=None,
lateral_from_linter=None,
- **kw
+ **kw,
):
if from_linter and operators.is_comparison(binary.operator):
if lateral_from_linter is not None:
opstring,
from_linter=from_linter,
lateral_from_linter=lateral_from_linter,
- **kw
+ **kw,
)
def visit_function_as_comparison_op_binary(self, element, operator, **kw):
return self._generate_generic_binary(
element,
" " + self.escape_literal_column(operator.opstring) + " ",
- **kw
+ **kw,
)
def visit_custom_op_unary_operator(self, element, operator, **kw):
return self._generate_generic_binary(
binary,
" NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ",
- **kw
+ **kw,
)
def visit_regexp_match_op_binary(self, binary, operator, **kw):
skip_bind_expression=False,
literal_execute=False,
render_postcompile=False,
- **kwargs
+ **kwargs,
):
if not skip_bind_expression:
impl = bindparam.type.dialect_impl(self.dialect)
literal_binds=literal_binds,
literal_execute=literal_execute,
render_postcompile=render_postcompile,
- **kwargs
+ **kwargs,
)
if bindparam.expanding:
# for postcompile w/ expanding, move the "wrapped" part
post_compile=post_compile,
expanding=bindparam.expanding,
bindparam_type=bindparam.type,
- **kwargs
+ **kwargs,
)
if bindparam.expanding:
expanding=False,
escaped_from=None,
bindparam_type=None,
- **kw
+ **kw,
):
if self.positional:
fromhints=None,
visiting_cte=None,
from_linter=None,
- **kwargs
+ **kwargs,
):
self._init_cte_state()
lateral=False,
enclosing_alias=None,
from_linter=None,
- **kwargs
+ **kwargs,
):
if lateral:
fromhints=fromhints,
lateral=lateral,
enclosing_alias=alias,
- **kwargs
+ **kwargs,
)
if subquery and (asfrom or lateral):
inner = "(%s)" % (inner,)
elements.Tuple(
types=element._column_types, *elem
).self_group(),
- **kw
+ **kw,
)
for chunk in element._data
for elem in chunk
select_wraps_for=None,
lateral=False,
from_linter=None,
- **kwargs
+ **kwargs,
):
assert select_wraps_for is None, (
"SQLAlchemy 1.4 requires use of "
asfrom=True,
fromhints=byfrom,
from_linter=from_linter,
- **kwargs
+ **kwargs,
)
for f in froms
]
self,
asfrom=True,
from_linter=from_linter,
- **kwargs
+ **kwargs,
)
for f in froms
]
fromhints=None,
use_schema=True,
from_linter=None,
- **kwargs
+ **kwargs,
):
if from_linter:
from_linter.froms[table] = table.fullname
update_stmt.table,
render_extra_froms,
dialect_hints,
- **kw
+ **kw,
)
if extra_from_text:
text += " " + extra_from_text
delete_stmt.table,
extra_froms,
dialect_hints,
- **kw
+ **kw,
)
if extra_from_text:
text += " " + extra_from_text
name=_col_bind_name(c)
if not compile_state._has_multi_parameters
else "%s_m0" % _col_bind_name(c),
- **kw
+ **kw,
)
elif value._is_bind_parameter:
value = _handle_values_anonymous_param(
name=_col_bind_name(c)
if not compile_state._has_multi_parameters
else "%s_m0" % _col_bind_name(c),
- **kw
+ **kw,
)
else:
# value is a SQL expression
value,
required=value is REQUIRED,
name=_col_bind_name(c),
- **kw # TODO: no test coverage for literal binds here
+ **kw, # TODO: no test coverage for literal binds here
)
elif value._is_bind_parameter:
value = _handle_values_anonymous_param(
col,
row[key],
name="%s_m%d" % (col.key, i + 1),
- **kw
+ **kw,
)
else:
new_param = compiler.process(row[key].self_group(), **kw)
_python_is_types=(util.NoneType, bool),
_any_all_expr=False,
result_type=None,
- **kwargs
+ **kwargs,
):
if result_type is None:
negate=operators.not_match_op
if op is operators.match_op
else operators.match_op,
- **kw
+ **kw,
)
negate=operators.not_regexp_match_op
if op is operators.regexp_match_op
else operators.regexp_match_op,
- **kw
+ **kw,
)
prefixes=None,
returning=None,
return_defaults=False,
- **dialect_kw
+ **dialect_kw,
):
"""Construct an :class:`_expression.Insert` object.
returning=None,
return_defaults=False,
preserve_parameter_order=False,
- **dialect_kw
+ **dialect_kw,
):
r"""Construct an :class:`_expression.Update` object.
whereclause=None,
returning=None,
prefixes=None,
- **dialect_kw
+ **dialect_kw,
):
r"""Construct :class:`_expression.Delete` object.
column_keys=None,
for_executemany=False,
schema_translate_map=None,
- **kw
+ **kw,
):
if compiled_cache is not None and dialect._supports_statement_cache:
elem_cache_key = self._generate_cache_key()
column_keys=column_keys,
for_executemany=for_executemany,
schema_translate_map=schema_translate_map,
- **kw
+ **kw,
)
compiled_cache[key] = compiled_sql
else:
column_keys=column_keys,
for_executemany=for_executemany,
schema_translate_map=schema_translate_map,
- **kw
+ **kw,
)
if not dialect._supports_statement_cache:
if partition_by is not None:
self.partition_by = ClauseList(
*util.to_list(partition_by),
- _literal_as_text_role=roles.ByOfRole
+ _literal_as_text_role=roles.ByOfRole,
)
if range_:
name=None,
name_is_truncatable=False,
disallow_is_literal=False,
- **kw
+ **kw,
):
c = ColumnClause(
coercions.expect(roles.TruncatedLabelRole, name or self.name)
name=None,
name_is_truncatable=False,
disallow_is_literal=False,
- **kw
+ **kw,
):
# the "is_literal" flag normally should never be propagated; a proxied
# column is always a SQL identifier and never the actual expression
_compared_to_type=self.type,
unique=True,
type_=type_,
- **kw
+ **kw,
)
def self_group(self, against=None):
_compared_to_type=self.type,
type_=type_,
unique=True,
- **kw
+ **kw,
)
self._resolved = clone(
self._resolved,
deferred_copy_internals=deferred_copy_internals,
- **kw
+ **kw,
)
@util.memoized_property
self._init_items(
*args,
- allow_replacements=extend_existing or keep_existing or autoload
+ allow_replacements=extend_existing or keep_existing or autoload,
)
def _autoload(
schema=schema,
comment=self.comment,
*args,
- **self.kwargs
+ **self.kwargs,
)
for c in self.constraints:
if isinstance(c, ForeignKeyConstraint):
for expr in index.expressions
],
_table=table,
- **index.kwargs
+ **index.kwargs,
)
return self._schema_item_copy(table)
doc=self.doc,
comment=self.comment,
*args,
- **column_kwargs
+ **column_kwargs,
)
return self._schema_item_copy(c)
primary_key=self.primary_key,
nullable=self.nullable,
_proxies=[self],
- *fk
+ *fk,
)
except TypeError as err:
raise TypeError(
link_to_name=False,
match=None,
info=None,
- **dialect_kw
+ **dialect_kw,
):
r"""
Construct a column-level FOREIGN KEY.
initially=self.initially,
link_to_name=self.link_to_name,
match=self.match,
- **self._unvalidated_dialect_kw
+ **self._unvalidated_dialect_kw,
)
return self._schema_item_copy(fk)
deferrable=self.deferrable,
initially=self.initially,
match=self.match,
- **self._unvalidated_dialect_kw
+ **self._unvalidated_dialect_kw,
)
self.constraint._append_element(column, self)
self.constraint._set_parent_with_dispatch(table)
_create_rule=None,
info=None,
_type_bound=False,
- **dialect_kw
+ **dialect_kw,
):
r"""Create a SQL constraint.
_copy_expression(expr, self.parent, target_table)
for expr in self.columns
],
- **constraint_kwargs
+ **constraint_kwargs,
)
return self._schema_item_copy(c)
_create_rule=None,
_autoattach=True,
_type_bound=False,
- **kw
+ **kw,
):
r"""Construct a CHECK constraint.
_type_bound=_type_bound,
_autoattach=_autoattach,
*columns,
- **kw
+ **kw,
)
if table is not None:
self._set_parent_with_dispatch(table)
match=None,
table=None,
info=None,
- **dialect_kw
+ **dialect_kw,
):
r"""Construct a composite-capable FOREIGN KEY.
deferrable=deferrable,
initially=initially,
info=info,
- **dialect_kw
+ **dialect_kw,
)
self.onupdate = onupdate
self.ondelete = ondelete
match=self.match,
deferrable=self.deferrable,
initially=self.initially,
- **self.dialect_kwargs
+ **self.dialect_kwargs,
)
for refcol in refcolumns
]
self,
*expressions,
_column_flag=_column_flag,
- _gather_expressions=self.expressions
+ _gather_expressions=self.expressions,
)
if table is not None:
extend_existing=False,
autoload_replace=True,
resolve_fks=True,
- **dialect_kwargs
+ **dialect_kwargs,
):
r"""Load all available table definitions from the database.
whereclause=whereclause,
values=values,
inline=inline,
- **kwargs
+ **kwargs,
)
@util.preload_module("sqlalchemy.sql.dml")
onclause=None,
*,
isouter=False,
- full=False
+ full=False,
) -> SelfSelect:
r"""Create a SQL JOIN against this :class:`_expression.Select`
object's criterion
*util.preloaded.sql_util.reduce_columns(
self._all_selected_columns,
only_synonyms=only_synonyms,
- *(self._where_criteria + self._from_obj)
+ *(self._where_criteria + self._from_obj),
)
)
inherit_schema=self.inherit_schema,
metadata=metadata,
_create_events=_create_events,
- **kw
+ **kw,
)
def create(self, bind, checkfirst=False):
idx
for idx, char in enumerate(id_)
if char in ("n", "r", "s", "a")
- ]
+ ],
)
fns = [
(operator.itemgetter(idx), _combination_id_fns[char])
# there's no way to make this happen with some drivers like
# mysqlclient, pymysql. this at least does produce a non-
# ascii error message for cx_oracle, psycopg2
- conn.execute(select(literal_column(u"méil")))
+ conn.execute(select(literal_column("méil")))
assert False
except exc.DBAPIError as err:
err_str = str(err)
# https://www.arbinada.com/en/node/1645
sa.UniqueConstraint("name", name="user_tmp_uq_%s" % config.ident),
sa.Index("user_tmp_ix", "foo"),
- **kw
+ **kw,
)
if (
testing.requires.view_reflection.enabled
t = Table(
"t",
metadata,
- *[Column("t%d" % i, type_) for i, type_ in enumerate(types)]
+ *[Column("t%d" % i, type_) for i, type_ in enumerate(types)],
)
t.create(connection)
for d in combinations
],
id_="i" + ("a" * len(keys)),
- argnames=",".join(keys)
+ argnames=",".join(keys),
)
[tool.black]
line-length = 79
-target-version = ['py27', 'py36']
+target-version = ['py37']
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
- *[Column("col%d" % i, Integer) for i in range(10)]
+ *[Column("col%d" % i, Integer) for i in range(10)],
)
class Wide:
**dict(
(letter, "%s%d" % (letter, i))
for letter in ["x", "y", "z", "p", "q", "r"]
- )
+ ),
)
for i in range(1, 1001)
]
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"b",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("a_id", ForeignKey("a.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"c",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("b_id", ForeignKey("b.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"d",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("c_id", ForeignKey("c.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"e",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("a_id", ForeignKey("a.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"f",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("e_id", ForeignKey("e.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"g",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("e_id", ForeignKey("e.id")),
- *make_some_columns()
+ *make_some_columns(),
)
@classmethod
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"b",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("a_id", ForeignKey("a.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"c",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("b_id", ForeignKey("b.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"d",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("b_id", ForeignKey("b.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"e",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("b_id", ForeignKey("b.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"f",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("b_id", ForeignKey("b.id")),
- *make_some_columns()
+ *make_some_columns(),
)
Table(
"g",
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("a_id", ForeignKey("a.id")),
- *make_some_columns()
+ *make_some_columns(),
)
@classmethod
*[
Column("field%d" % fnum, String(50))
for fnum in range(NUM_FIELDS)
- ]
+ ],
)
Table(
"table2",
*[
Column("field%d" % fnum, Unicode(50))
for fnum in range(NUM_FIELDS)
- ]
+ ],
)
@classmethod
[
{
"id": 1,
- "txt": u"foo",
+ "txt": "foo",
"dt2": datetime.datetime(2020, 1, 1, 1, 1, 1),
},
{
"id": 2,
- "txt": u"bar",
+ "txt": "bar",
"dt2": datetime.datetime(2020, 2, 2, 2, 2, 2),
},
],
*[
Column("long_named_column_number_%d" % i, Integer)
for i in range(col_num)
- ]
+ ],
)
cls.view_str = (
view_str
metadata,
Column("c1", Integer()),
comment=comment,
- **kwargs
+ **kwargs,
)
conn = connection
version="0.0.0",
paramstyle="named",
),
- **kw
+ **kw,
)
dialect._get_server_version_info = server_version_info
long_text.create(connection)
if isinstance(datatype, UnicodeText):
- word_seed = u"ab🐍’«cdefg"
+ word_seed = "ab🐍’«cdefg"
else:
word_seed = "abcdef"
- some_text = u" ".join(
+ some_text = " ".join(
"".join(random.choice(word_seed) for j in range(150))
for i in range(datasize)
)
)
long_text.create(connection)
- word_seed = u"ab🐍’«cdefg"
+ word_seed = "ab🐍’«cdefg"
- some_text = u" ".join(
+ some_text = " ".join(
"".join(random.choice(word_seed) for j in range(10))
for i in range(15)
)
"name",
ENUM(
*("beans", "means", "keens", "faux", "beau", "flow"),
- name="my_enum"
+ name="my_enum",
),
),
)
{"x": "x2", "y": "y2"},
{"x": "x3", "y": "y3"},
),
- **expected_kwargs
+ **expected_kwargs,
)
],
)
{"x": "x2", "y": "y2"},
{"x": "x3", "y": "y3"},
),
- **expected_kwargs
+ **expected_kwargs,
)
],
)
{"x": "x2", "y": "y2"},
{"x": "x3", "y": "y3"},
),
- **expected_kwargs
+ **expected_kwargs,
)
],
)
{"xval": "x1", "yval": "y5"},
{"xval": "x3", "yval": "y6"},
),
- **expected_kwargs
+ **expected_kwargs,
)
],
)
eq_(
check_constraints,
{
- u"cc1": u"(a > 1) AND (a < 5)",
- u"cc2": u"(a = 1) OR ((a > 2) AND (a < 5))",
- u"cc3": u"is_positive(a)",
- u"cc4": u"(b)::text <> 'hi\nim a name \nyup\n'::text",
+ "cc1": "(a > 1) AND (a < 5)",
+ "cc2": "(a = 1) OR ((a > 2) AND (a < 5))",
+ "cc3": "is_positive(a)",
+ "cc4": "(b)::text <> 'hi\nim a name \nyup\n'::text",
},
)
{
"id": 1,
"intarr": [1, 2, 3],
- "strarr": [u"one", u"two", u"three"],
+ "strarr": ["one", "two", "three"],
},
{
"id": 2,
"intarr": [4, 5, 6],
- "strarr": [u"four", u"five", u"six"],
+ "strarr": ["four", "five", "six"],
},
- {"id": 3, "intarr": [1, 5], "strarr": [u"one", u"five"]},
+ {"id": 3, "intarr": [1, 5], "strarr": ["one", "five"]},
{"id": 4, "intarr": [], "strarr": []},
],
)
.where(
arrtable.c.strarr.in_(
[
- [u"one", u"five"],
- [u"four", u"five", u"six"],
- [u"nine", u"ten"],
+ ["one", "five"],
+ ["four", "five", "six"],
+ ["nine", "ten"],
]
)
)
[
{
"unique": 0,
- "name": u"ix_main_l_bar",
- "column_names": [u"bar"],
+ "name": "ix_main_l_bar",
+ "column_names": ["bar"],
}
],
)
"%s must be a string" % argname,
url.URL.create,
"somedriver",
- **{argname: value}
+ **{argname: value},
)
@testing.combinations("username", "host", "database", argnames="argname")
TypeError,
"%s must be a string" % argname,
u1.set,
- **{argname: 35.8}
+ **{argname: 35.8},
)
def test_only_str_query_key_constructor(self):
creator=lambda: self.dbapi.connect("foo.db"),
pre_ping=pre_ping,
dialect=dialect,
- **(pool_kw if pool_kw else {})
+ **(pool_kw if pool_kw else {}),
)
dialect.is_disconnect = lambda e, conn, cursor: isinstance(
foo = Table(
"foo",
meta,
- *[Column(n, sa.String(30)) for n in ["a", "b", "c", "d", "e", "f"]]
+ *[
+ Column(n, sa.String(30))
+ for n in ["a", "b", "c", "d", "e", "f"]
+ ],
)
meta.create_all(connection)
meta2 = MetaData()
"Association proxy D.c refers to an attribute 'csub_only_data'",
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_attribute(self, message, fn, *arg, **kw):
attrname,
local_cls,
referred_cls,
- **kw
+ **kw,
)
Base.prepare(generate_relationship=_gen_relationship)
result = bq(sess).all()
if cond1:
- eq_(result, [(8, u"ed"), (9, u"fred"), (10, u"chuck")])
+ eq_(result, [(8, "ed"), (9, "fred"), (10, "chuck")])
else:
eq_(result, [(7, "jack")])
Address,
order_by=self.tables.addresses.c.id,
lazy=lazy,
- **kw
+ **kw,
)
},
)
shard_chooser=lambda *args: "main",
id_chooser=lambda *args: ["fake", "main"],
execute_chooser=lambda *args: ["fake", "main"],
- **kw
+ **kw,
)
def test_refresh(self):
Manager(
status="AAB",
manager_name="manager1",
- **{person_attribute_name: "pointy haired boss"}
+ **{person_attribute_name: "pointy haired boss"},
),
Engineer(
status="BBA",
engineer_name="engineer1",
primary_language="java",
- **{person_attribute_name: "dilbert"}
+ **{person_attribute_name: "dilbert"},
),
]
if include_base:
status="CGG",
engineer_name="engineer2",
primary_language="python",
- **{person_attribute_name: "wally"}
+ **{person_attribute_name: "wally"},
),
Manager(
status="ABA",
manager_name="manager2",
- **{person_attribute_name: "jsmith"}
+ **{person_attribute_name: "jsmith"},
),
]
status="BBB",
manager_name="boss",
golf_swing="fore",
- **{person_attribute_name: "daboss"}
+ **{person_attribute_name: "daboss"},
)
session.add(daboss)
assert_raises(sa_exc.DBAPIError, session.flush)
uselist=uselist,
useobject=useobject,
active_history=active_history,
- **kw
+ **kw,
)
return Foo
"t",
m,
Column("id", Integer, primary_key=True),
- *[Column(name, Integer) for name in names]
+ *[Column(name, Integer) for name in names],
)
m = self.mapper_registry.map_imperatively(MyClass, t)
self.m2mleft,
self.m2mright,
secondary=self.m2msecondary,
- **kw
+ **kw,
)
def _join_fixture_m2m_backref(self, **kw):
self.selfref,
self.selfref,
remote_side=set([self.selfref.c.id]),
- **kw
+ **kw,
)
def _join_fixture_o2m_composite_selfref(self, **kw):
self.composite_selfref,
self.composite_selfref,
self.composite_selfref,
- **kw
+ **kw,
)
def _join_fixture_m2o_composite_selfref(self, **kw):
self.composite_selfref.c.group_id,
]
),
- **kw
+ **kw,
)
def _join_fixture_o2m_composite_selfref_func(self, **kw):
self.composite_selfref.c.parent_id
== self.composite_selfref.c.id,
),
- **kw
+ **kw,
)
def _join_fixture_o2m_composite_selfref_func_remote_side(self, **kw):
== self.composite_selfref.c.id,
),
remote_side=set([self.composite_selfref.c.parent_id]),
- **kw
+ **kw,
)
def _join_fixture_o2m_composite_selfref_func_annotated(self, **kw):
remote(self.composite_selfref.c.parent_id)
== self.composite_selfref.c.id,
),
- **kw
+ **kw,
)
def _join_fixture_compound_expression_1(self, **kw):
== relationships.remote(
relationships.foreign(self.right.c.x * self.right.c.y)
),
- **kw
+ **kw,
)
def _join_fixture_compound_expression_2(self, **kw):
self.right,
primaryjoin=(self.left.c.x + self.left.c.y)
== relationships.foreign(self.right.c.x * self.right.c.y),
- **kw
+ **kw,
)
def _join_fixture_compound_expression_1_non_annotated(self, **kw):
self.right,
primaryjoin=(self.left.c.x + self.left.c.y)
== (self.right.c.x * self.right.c.y),
- **kw
+ **kw,
)
def _join_fixture_base_to_joined_sub(self, **kw):
self.base_w_sub_rel,
self.rel_sub,
primaryjoin=self.base_w_sub_rel.c.sub_id == self.rel_sub.c.id,
- **kw
+ **kw,
)
def _join_fixture_o2m_joined_sub_to_base(self, **kw):
self.left,
self.right,
primaryjoin=self.left.c.id == foreign(func.foo(self.right.c.lid)),
- **kw
+ **kw,
)
def _join_fixture_o2m_to_oldstyle_func(self, **kw):
self.right,
primaryjoin=self.left.c.id == func.foo(self.right.c.lid),
consider_as_foreign_keys={self.right.c.lid},
- **kw
+ **kw,
)
def _join_fixture_overlapping_composite_fks(self, **kw):
self.composite_multi_ref.c.uid2,
self.composite_multi_ref.c.oid,
},
- **kw
+ **kw,
)
def _join_fixture_o2m_o_side_none(self, **kw):
primaryjoin=and_(
self.left.c.id == self.right.c.lid, self.left.c.x == 5
),
- **kw
+ **kw,
)
def _join_fixture_purely_single_o2m(self, **kw):
% (primary, expr, relname),
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_no_equality(
% (primary, expr, relname),
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_ambig_join(
% (relname, secondary_arg),
fn,
*arg,
- **kw
+ **kw,
)
else:
assert_raises_message(
% (relname,),
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_no_join(self, fn, relname, secondary_arg, *arg, **kw):
"'secondaryjoin' expressions" % (relname, secondary_arg),
fn,
*arg,
- **kw
+ **kw,
)
else:
assert_raises_message(
"expression." % (relname,),
fn,
*arg,
- **kw
+ **kw,
)
datetime.datetime(2009, 10, 16, 12, 00, 00),
datetime.datetime(2009, 10, 18, 12, 00, 00),
),
- *loader_options
+ *loader_options,
)
)
if is_joined:
datetime.datetime(2009, 10, 15, 11, 00, 00),
datetime.datetime(2009, 10, 18, 12, 00, 00),
),
- *loader_options
+ *loader_options,
)
)
if is_joined:
% (primary, expr, relname),
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_no_equality(
% (primary, expr, relname),
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_ambig_join(
% (relname, secondary_arg),
fn,
*arg,
- **kw
+ **kw,
)
else:
assert_raises_message(
"foreign key reference to the parent table." % (relname,),
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_no_join(self, fn, relname, secondary_arg, *arg, **kw):
"'secondaryjoin' expressions" % (relname, secondary_arg),
fn,
*arg,
- **kw
+ **kw,
)
else:
assert_raises_message(
"expression." % (relname,),
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_ambiguous_direction(self, fn, relname, *arg, **kw):
"via the foreign_keys argument." % relname,
fn,
*arg,
- **kw
+ **kw,
)
def _assert_raises_no_local_remote(self, fn, relname, *arg, **kw):
% (relname),
fn,
*arg,
- **kw
+ **kw,
)
Column("a", Integer),
Column("b", Integer),
Column("c", Integer),
- *fkcs
+ *fkcs,
)
Table("remote", m, Column("id", Integer, primary_key=True))
(t, combination)
for i, combination in zip(range(10), combinations())
],
- argnames="t, idx_to_value"
+ argnames="t, idx_to_value",
)
@random_update_order_parameters()
"a",
metadata,
Column("foo", String, primary_key=True),
- **_kwargs
+ **_kwargs,
)
eq_(getattr(table_a, attrib), _attrib_value)
eq_(getattr(metadata.tables["a"], attrib), _attrib_value)
)
def test_repr_unicode(self):
- name = quoted_name(u"姓名", None)
+ name = quoted_name("姓名", None)
- eq_(repr(name), repr(u"姓名"))
+ eq_(repr(name), repr("姓名"))
def test_lower_case_names(self):
# Create table with quote defaults
("NAME", "name", False),
("NA ME", "NA ME", False),
("NaMe", "NaMe", False),
- (u"姓名", u"姓名", False),
+ ("姓名", "姓名", False),
("name", "name", True), # an all-lower case name needs quote forced
)
def test_name_normalize(self, original, normalized, is_quote):
("name", "NAME", False),
("NA ME", "NA ME", False),
("NaMe", "NaMe", False),
- (u"姓名", u"姓名", False),
+ ("姓名", "姓名", False),
(quoted_name("name", quote=True), "name", True),
)
def test_name_denormalize(self, original, denormalized, is_quote):
r"explicitly declared (?:with|as) text\(%(stmt)r\)"
% {"stmt": util.ellipses_string(offending_clause)},
fn,
- *arg
+ *arg,
)
def test_where(self):
def process_bind_param(self, value, dialect):
if value is None:
- value = u"<null value>"
+ value = "<null value>"
return "BIND_IN" + value
def process_result_value(self, value, dialect):
def process(value):
if value is None:
- value = u"<null value>"
+ value = "<null value>"
return "BIND_IN" + impl_processor(value)
*[v.name for v in values],
name="myenum",
native_enum=False,
- create_constraint=True
+ create_constraint=True,
)
# future method
(t, combination)
for i, combination in zip(range(10), combinations())
],
- argnames="t, idx_to_value"
+ argnames="t, idx_to_value",
)
@random_update_order_parameters()
flake8
flake8-import-order
flake8-builtins
- flake8-docstrings>=1.3.1
+ flake8-docstrings>=1.6.0
flake8-rst-docstrings
# flake8-rst-docstrings dependency, leaving it here
# in case it requires a version pin
pydocstyle
pygments
- black==21.5b1
+ black==21.12b0
commands =
flake8 ./lib/ ./test/ ./examples/ setup.py doc/build/conf.py {posargs}
black --check ./lib/ ./test/ ./examples/ setup.py doc/build/conf.py