- "ubuntu-latest"
python-version:
- "3.12"
+ - "3.13"
fail-fast: false
os:
- "ubuntu-latest"
python-version:
- - "3.11"
- "3.12"
- "3.13"
from __future__ import annotations
import os
+import pathlib
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
"""
config.print_stdout("Available templates:\n")
- for tempname in os.listdir(config.get_template_directory()):
- with open(
- os.path.join(config.get_template_directory(), tempname, "README")
- ) as readme:
+ for tempname in config._get_template_path().iterdir():
+ with (
+ config._get_template_path() / tempname / "README"
+ ).open() as readme:
synopsis = next(readme).rstrip()
config.print_stdout("%s - %s", tempname, synopsis)
"""
- if os.access(directory, os.F_OK) and os.listdir(directory):
+ directory_path = pathlib.Path(directory)
+ if directory_path.exists() and list(directory_path.iterdir()):
raise util.CommandError(
- "Directory %s already exists and is not empty" % directory
+ "Directory %s already exists and is not empty" % directory_path
)
- template_dir = os.path.join(config.get_template_directory(), template)
- if not os.access(template_dir, os.F_OK):
- raise util.CommandError("No such template %r" % template)
+ template_path = config._get_template_path() / template
- if not os.access(directory, os.F_OK):
+ if not template_path.exists():
+ raise util.CommandError("No such template {template_path}")
+
+ # left as os.access() to suit unit test mocking
+ if not os.access(directory_path, os.F_OK):
with util.status(
- f"Creating directory {os.path.abspath(directory)!r}",
+ f"Creating directory {directory_path.absolute()!r}",
**config.messaging_opts,
):
- os.makedirs(directory)
+ os.makedirs(directory_path)
- versions = os.path.join(directory, "versions")
+ versions = directory_path / "versions"
with util.status(
- f"Creating directory {os.path.abspath(versions)!r}",
+ f"Creating directory {versions.absolute()!r}",
**config.messaging_opts,
):
os.makedirs(versions)
- if not os.path.isabs(directory):
+ if not directory_path.is_absolute():
# for non-absolute path, state config file in .ini / pyproject
# as relative to the %(here)s token, which is where the config
# file itself would be
- if config.config_file_name is not None:
- rel_dir = util.relpath_via_abs_root(
- os.path.abspath(config.config_file_name), directory
+ if config._config_file_path is not None:
+ rel_dir = compat.path_relative_to(
+ directory_path.absolute(),
+ config._config_file_path.absolute().parent,
+ walk_up=True,
)
-
- ini_script_location_directory = os.path.join("%(here)s", rel_dir)
- if config.toml_file_name is not None:
- rel_dir = util.relpath_via_abs_root(
- os.path.abspath(config.toml_file_name), directory
+ ini_script_location_directory = ("%(here)s" / rel_dir).as_posix()
+ if config._toml_file_path is not None:
+ rel_dir = compat.path_relative_to(
+ directory_path.absolute(),
+ config._toml_file_path.absolute().parent,
+ walk_up=True,
)
-
- toml_script_location_directory = os.path.join("%(here)s", rel_dir)
+ toml_script_location_directory = ("%(here)s" / rel_dir).as_posix()
else:
- ini_script_location_directory = directory
- toml_script_location_directory = directory
+ ini_script_location_directory = directory_path.as_posix()
+ toml_script_location_directory = directory_path.as_posix()
- script = ScriptDirectory(directory)
+ script = ScriptDirectory(directory_path)
has_toml = False
- config_file: str | None = None
- for file_ in os.listdir(template_dir):
- file_path = os.path.join(template_dir, file_)
+ config_file: pathlib.Path | None = None
+
+ for file_path in template_path.iterdir():
+ file_ = file_path.name
if file_ == "alembic.ini.mako":
assert config.config_file_name is not None
- config_file = os.path.abspath(config.config_file_name)
- if os.access(config_file, os.F_OK):
+ config_file = pathlib.Path(config.config_file_name).absolute()
+ if config_file.exists():
util.msg(
f"File {config_file!r} already exists, skipping",
**config.messaging_opts,
)
elif file_ == "pyproject.toml.mako":
has_toml = True
- assert config.toml_file_name is not None
- toml_file = os.path.abspath(config.toml_file_name)
+ assert config._toml_file_path is not None
+ toml_path = config._toml_file_path.absolute()
- if os.access(toml_file, os.F_OK):
- with open(toml_file, "rb") as f:
+ if toml_path.exists():
+ # left as open() to suit unit test mocking
+ with open(toml_path, "rb") as f:
toml_data = compat.tomllib.load(f)
if "tool" in toml_data and "alembic" in toml_data["tool"]:
continue
script._append_template(
file_path,
- toml_file,
+ toml_path,
script_location=toml_script_location_directory,
)
else:
script._generate_template(
file_path,
- toml_file,
+ toml_path,
script_location=toml_script_location_directory,
)
- elif os.path.isfile(file_path):
- output_file = os.path.join(directory, file_)
+ elif file_path.is_file():
+ output_file = directory_path / file_
script._copy_file(file_path, output_file)
if package:
for path in [
- os.path.join(os.path.abspath(directory), "__init__.py"),
- os.path.join(os.path.abspath(versions), "__init__.py"),
+ directory_path.absolute() / "__init__.py",
+ versions.absolute() / "__init__.py",
]:
- with util.status(f"Adding {path!r}", **config.messaging_opts):
+ with util.status(f"Adding {path!s}", **config.messaging_opts):
+ # left as open() to suit unit test mocking
with open(path, "w"):
pass
if has_toml:
util.msg(
- f"Please edit configuration settings in {toml_file!r} and "
+ f"Please edit configuration settings in {toml_path!r} and "
"configuration/connection/logging "
f"settings in {config_file!r} before proceeding.",
**config.messaging_opts,
head: str = "head",
splice: bool = False,
branch_label: Optional[_RevIdType] = None,
- version_path: Optional[str] = None,
+ version_path: Union[str, os.PathLike[str], None] = None,
rev_id: Optional[str] = None,
depends_on: Optional[str] = None,
process_revision_directives: Optional[ProcessRevisionDirectiveFn] = None,
from configparser import ConfigParser
import inspect
import os
+from pathlib import Path
import re
import sys
from typing import Any
config_file_name: Union[str, os.PathLike[str], None] = None
"""Filesystem path to the .ini file in use."""
+ toml_file_name: Union[str, os.PathLike[str], None] = None
+ """Filesystem path to the pyproject.toml file in use.
+
+ .. versionadded:: 1.16.0
+
+ """
+
+ @property
+ def _config_file_path(self) -> Optional[Path]:
+ if self.config_file_name is None:
+ return None
+ return Path(self.config_file_name)
+
+ @property
+ def _toml_file_path(self) -> Optional[Path]:
+ if self.toml_file_name is None:
+ return None
+ return Path(self.toml_file_name)
+
config_ini_section: str = None # type:ignore[assignment]
"""Name of the config file section to read basic configuration
from. Defaults to ``alembic``, that is the ``[alembic]`` section
def file_config(self) -> ConfigParser:
"""Return the underlying ``ConfigParser`` object.
- Direct access to the .ini file is available here,
+ Dir*-ect access to the .ini file is available here,
though the :meth:`.Config.get_section` and
:meth:`.Config.get_main_option`
methods provide a possibly simpler interface.
"""
- if self.config_file_name:
- here = os.path.abspath(os.path.dirname(self.config_file_name))
+ if self._config_file_path:
+ here = self._config_file_path.absolute().parent
else:
- here = ""
- self.config_args["here"] = here
+ here = Path()
+ self.config_args["here"] = here.as_posix()
file_config = ConfigParser(self.config_args)
- if self.config_file_name:
- compat.read_config_parser(file_config, [self.config_file_name])
+ if self._config_file_path:
+ compat.read_config_parser(file_config, [self._config_file_path])
else:
file_config.add_section(self.config_ini_section)
return file_config
"""Return a dictionary of the [tool.alembic] section from
pyproject.toml"""
- if self.toml_file_name and os.path.exists(self.toml_file_name):
+ if self._toml_file_path and self._toml_file_path.exists():
- here = os.path.abspath(os.path.dirname(self.toml_file_name))
- self.toml_args["here"] = here
+ here = self._toml_file_path.absolute().parent
+ self.toml_args["here"] = here.as_posix()
- with open(self.toml_file_name, "rb") as f:
+ with open(self._toml_file_path, "rb") as f:
toml_data = compat.tomllib.load(f)
data = toml_data.get("tool", {}).get("alembic", {})
if not isinstance(data, dict):
This method is used by the alembic ``init`` and ``list_templates``
commands.
+ """
+ return self._get_template_path().as_posix()
+
+ def _get_template_path(self) -> Path:
+ """Return the directory where Alembic setup templates are found.
+
+ This method is used by the alembic ``init`` and ``list_templates``
+ commands.
+
+ .. versionadded:: 1.16.0
+
"""
import alembic
- package_dir = os.path.abspath(os.path.dirname(alembic.__file__))
- return os.path.join(package_dir, "templates")
+ package_dir = Path(alembic.__file__).absolute().parent
+ return package_dir / "templates"
@overload
def get_section(
from __future__ import annotations
from abc import abstractmethod
+import os
+import pathlib
import re
from typing import Any
from typing import Callable
head: Optional[str] = None,
splice: Optional[bool] = None,
branch_label: Optional[_RevIdType] = None,
- version_path: Optional[str] = None,
+ version_path: Union[str, os.PathLike[str], None] = None,
depends_on: Optional[_RevIdType] = None,
) -> None:
self.rev_id = rev_id
self.head = head
self.splice = splice
self.branch_label = branch_label
- self.version_path = version_path
+ self.version_path = (
+ pathlib.Path(version_path).as_posix() if version_path else None
+ )
self.depends_on = depends_on
self.upgrade_ops = upgrade_ops
self.downgrade_ops = downgrade_ops
from contextlib import contextmanager
import datetime
import os
+from pathlib import Path
import re
import shutil
import sys
from ..runtime import migration
from ..util import compat
from ..util import not_none
+from ..util.pyfiles import _preserving_path_as_str
if TYPE_CHECKING:
from .revision import _GetRevArg
def __init__(
self,
- dir: str, # noqa
+ dir: Union[str, os.PathLike[str]], # noqa: A002
file_template: str = _default_file_template,
truncate_slug_length: Optional[int] = 40,
- version_locations: Optional[List[str]] = None,
+ version_locations: Optional[
+ Sequence[Union[str, os.PathLike[str]]]
+ ] = None,
sourceless: bool = False,
output_encoding: str = "utf-8",
timezone: Optional[str] = None,
"MessagingOptions", util.EMPTY_DICT
),
) -> None:
- self.dir = dir
+ self.dir = _preserving_path_as_str(dir)
+ self.version_locations = [
+ _preserving_path_as_str(p) for p in version_locations or ()
+ ]
self.file_template = file_template
- self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
if not os.access(dir, os.F_OK):
raise util.CommandError(
- "Path doesn't exist: %r. Please use "
+ f"Path doesn't exist: {dir}. Please use "
"the 'init' command to create a new "
- "scripts folder." % os.path.abspath(dir)
+ "scripts folder."
)
@property
def versions(self) -> str:
+ """return a single version location based on the sole path passed
+ within version_locations.
+
+ If multiple version locations are configured, an error is raised.
+
+
+ """
+ return str(self._singular_version_location)
+
+ @util.memoized_property
+ def _singular_version_location(self) -> Path:
loc = self._version_locations
if len(loc) > 1:
raise util.CommandError("Multiple version_locations present")
return loc[0]
@util.memoized_property
- def _version_locations(self) -> Sequence[str]:
+ def _version_locations(self) -> Sequence[Path]:
if self.version_locations:
return [
- os.path.abspath(util.coerce_resource_to_filename(location))
+ util.coerce_resource_to_filename(location).absolute()
for location in self.version_locations
]
else:
- return (os.path.abspath(os.path.join(self.dir, "versions")),)
+ return [Path(self.dir, "versions").absolute()]
def _load_revisions(self) -> Iterator[Script]:
- if self.version_locations:
- paths = [
- vers
- for vers in self._version_locations
- if os.path.exists(vers)
- ]
- else:
- paths = [self.versions]
+ paths = [vers for vers in self._version_locations if vers.exists()]
dupes = set()
for vers in paths:
for file_path in Script._list_py_dir(self, vers):
- real_path = os.path.realpath(file_path)
+ real_path = file_path.resolve()
if real_path in dupes:
util.warn(
- "File %s loaded twice! ignoring. Please ensure "
- "version_locations is unique." % real_path
+ f"File {real_path} loaded twice! ignoring. "
+ "Please ensure version_locations is unique."
)
continue
dupes.add(real_path)
- filename = os.path.basename(real_path)
- dir_name = os.path.dirname(real_path)
- script = Script._from_filename(self, dir_name, filename)
+ script = Script._from_path(self, real_path)
if script is None:
continue
yield script
@property
def env_py_location(self) -> str:
- return os.path.abspath(os.path.join(self.dir, "env.py"))
+ return str(Path(self.dir, "env.py"))
- def _append_template(self, src: str, dest: str, **kw: Any) -> None:
+ def _append_template(self, src: Path, dest: Path, **kw: Any) -> None:
with util.status(
- f"Appending to existing {os.path.abspath(dest)}",
+ f"Appending to existing {dest.absolute()}",
**self.messaging_opts,
):
util.template_to_file(
src, dest, self.output_encoding, append=True, **kw
)
- def _generate_template(self, src: str, dest: str, **kw: Any) -> None:
+ def _generate_template(self, src: Path, dest: Path, **kw: Any) -> None:
with util.status(
- f"Generating {os.path.abspath(dest)}", **self.messaging_opts
+ f"Generating {dest.absolute()}", **self.messaging_opts
):
util.template_to_file(src, dest, self.output_encoding, **kw)
- def _copy_file(self, src: str, dest: str) -> None:
+ def _copy_file(self, src: Path, dest: Path) -> None:
with util.status(
- f"Generating {os.path.abspath(dest)}", **self.messaging_opts
+ f"Generating {dest.absolute()}", **self.messaging_opts
):
shutil.copy(src, dest)
- def _ensure_directory(self, path: str) -> None:
- path = os.path.abspath(path)
- if not os.path.exists(path):
+ def _ensure_directory(self, path: Path) -> None:
+ path = path.absolute()
+ if not path.exists():
with util.status(
f"Creating directory {path}", **self.messaging_opts
):
head: Optional[_RevIdType] = None,
splice: Optional[bool] = False,
branch_labels: Optional[_RevIdType] = None,
- version_path: Optional[str] = None,
+ version_path: Union[str, os.PathLike[str], None] = None,
+ file_template: Optional[str] = None,
depends_on: Optional[_RevIdType] = None,
**kw: Any,
) -> Optional[Script]:
for head_ in heads:
if head_ is not None:
assert isinstance(head_, Script)
- version_path = os.path.dirname(head_.path)
+ version_path = head_._script_path.parent
break
else:
raise util.CommandError(
"please specify --version-path"
)
else:
- version_path = self.versions
+ version_path = self._singular_version_location
+ else:
+ version_path = Path(version_path)
- norm_path = os.path.normpath(os.path.abspath(version_path))
+ assert isinstance(version_path, Path)
+ norm_path = version_path.absolute()
for vers_path in self._version_locations:
- if os.path.normpath(vers_path) == norm_path:
+ if vers_path.absolute() == norm_path:
break
else:
raise util.CommandError(
- "Path %s is not represented in current "
- "version locations" % version_path
+ f"Path {version_path} is not represented in current "
+ "version locations"
)
if self.version_locations:
resolved_depends_on = None
self._generate_template(
- os.path.join(self.dir, "script.py.mako"),
+ Path(self.dir, "script.py.mako"),
path,
up_revision=str(revid),
down_revision=revision.tuple_rev_as_scalar(
def _rev_path(
self,
- path: str,
+ path: Union[str, os.PathLike[str]],
rev_id: str,
message: Optional[str],
create_date: datetime.datetime,
- ) -> str:
+ ) -> Path:
epoch = int(create_date.timestamp())
slug = "_".join(_slug_re.findall(message or "")).lower()
if len(slug) > self.truncate_slug_length:
"second": create_date.second,
}
)
- return os.path.join(path, filename)
+ return Path(path) / filename
class Script(revision.Revision):
"""
- def __init__(self, module: ModuleType, rev_id: str, path: str):
+ def __init__(
+ self,
+ module: ModuleType,
+ rev_id: str,
+ path: Union[str, os.PathLike[str]],
+ ):
self.module = module
- self.path = path
+ self.path = _preserving_path_as_str(path)
super().__init__(
rev_id,
module.down_revision,
path: str
"""Filesystem path of the script."""
+ @property
+ def _script_path(self) -> Path:
+ return Path(self.path)
+
_db_current_indicator: Optional[bool] = None
"""Utility variable which when set will cause string output to indicate
this is a "current" version in some database"""
return util.format_as_comma(self._versioned_down_revisions)
@classmethod
- def _from_path(
- cls, scriptdir: ScriptDirectory, path: str
- ) -> Optional[Script]:
- dir_, filename = os.path.split(path)
- return cls._from_filename(scriptdir, dir_, filename)
-
- @classmethod
- def _list_py_dir(cls, scriptdir: ScriptDirectory, path: str) -> List[str]:
+ def _list_py_dir(
+ cls, scriptdir: ScriptDirectory, path: Path
+ ) -> List[Path]:
paths = []
- for root, dirs, files in os.walk(path, topdown=True):
- if root.endswith("__pycache__"):
+ for root, dirs, files in compat.path_walk(path, top_down=True):
+ if root.name.endswith("__pycache__"):
# a special case - we may include these files
# if a `sourceless` option is specified
continue
for filename in sorted(files):
- paths.append(os.path.join(root, filename))
+ paths.append(root / filename)
if scriptdir.sourceless:
# look for __pycache__
- py_cache_path = os.path.join(root, "__pycache__")
- if os.path.exists(py_cache_path):
+ py_cache_path = root / "__pycache__"
+ if py_cache_path.exists():
# add all files from __pycache__ whose filename is not
# already in the names we got from the version directory.
# add as relative paths including __pycache__ token
- names = {filename.split(".")[0] for filename in files}
+ names = {
+ Path(filename).stem.split(".")[0] for filename in files
+ }
paths.extend(
- os.path.join(py_cache_path, pyc)
- for pyc in os.listdir(py_cache_path)
- if pyc.split(".")[0] not in names
+ py_cache_path / pyc
+ for pyc in py_cache_path.iterdir()
+ if pyc.stem.split(".")[0] not in names
)
if not scriptdir.recursive_version_locations:
return paths
@classmethod
- def _from_filename(
- cls, scriptdir: ScriptDirectory, dir_: str, filename: str
+ def _from_path(
+ cls, scriptdir: ScriptDirectory, path: Union[str, os.PathLike[str]]
) -> Optional[Script]:
+
+ path = Path(path)
+ dir_, filename = path.parent, path.name
+
if scriptdir.sourceless:
py_match = _sourceless_rev_file.match(filename)
else:
is_c = is_o = False
if is_o or is_c:
- py_exists = os.path.exists(os.path.join(dir_, py_filename))
- pyc_exists = os.path.exists(os.path.join(dir_, py_filename + "c"))
+ py_exists = (dir_ / py_filename).exists()
+ pyc_exists = (dir_ / (py_filename + "c")).exists()
# prefer .py over .pyc because we'd like to get the
# source encoding; prefer .pyc over .pyo because we'd like to
m = _legacy_rev.match(filename)
if not m:
raise util.CommandError(
- "Could not determine revision id from filename %s. "
+ "Could not determine revision id from "
+ f"filename {filename}. "
"Be sure the 'revision' variable is "
"declared inside the script (please see 'Upgrading "
"from Alembic 0.1 to 0.2' in the documentation)."
- % filename
)
else:
revision = m.group(1)
else:
revision = module.revision
- return Script(module, revision, os.path.join(dir_, filename))
+ return Script(module, revision, dir_ / filename)
from __future__ import annotations
+import os
import shlex
import subprocess
import sys
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
+from typing import Union
from .. import util
from ..util import compat
+from ..util.pyfiles import _preserving_path_as_str
if TYPE_CHECKING:
from ..config import PostWriteHookConfig
return decorate
-def _invoke(name: str, revision: str, options: PostWriteHookConfig) -> Any:
+def _invoke(
+ name: str,
+ revision_path: Union[str, os.PathLike[str]],
+ options: PostWriteHookConfig,
+) -> Any:
"""Invokes the formatter registered for the given name.
:param name: The name of a formatter in the registry
- :param revision: A :class:`.MigrationRevision` instance
+ :param revision: string path to the revision file
:param options: A dict containing kwargs passed to the
specified formatter.
:raises: :class:`alembic.util.CommandError`
"""
+ revision_path = _preserving_path_as_str(revision_path)
try:
hook = _registry[name]
except KeyError as ke:
f"No formatter with name '{name}' registered"
) from ke
else:
- return hook(revision, options)
+ return hook(revision_path, options)
-def _run_hooks(path: str, hooks: list[PostWriteHookConfig]) -> None:
+def _run_hooks(
+ path: Union[str, os.PathLike[str]], hooks: list[PostWriteHookConfig]
+) -> None:
"""Invoke hooks for a generated revision."""
for hook in hooks:
from .pyfiles import coerce_resource_to_filename as coerce_resource_to_filename
from .pyfiles import load_python_file as load_python_file
from .pyfiles import pyc_file_from_path as pyc_file_from_path
-from .pyfiles import relpath_via_abs_root as relpath_via_abs_root
from .pyfiles import template_to_file as template_to_file
from .sqla_compat import sqla_2 as sqla_2
from configparser import ConfigParser
import io
import os
+from pathlib import Path
import sys
import typing
from typing import Any
+from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
py314 = sys.version_info >= (3, 14)
py313 = sys.version_info >= (3, 13)
+py312 = sys.version_info >= (3, 12)
py311 = sys.version_info >= (3, 11)
py310 = sys.version_info >= (3, 10)
py39 = sys.version_info >= (3, 9)
import tomli as tomllib # type: ignore # noqa
+if py312:
+
+ def path_walk(
+ path: Path, *, top_down: bool = True
+ ) -> Iterator[tuple[Path, list[str], list[str]]]:
+ return Path.walk(path)
+
+ def path_relative_to(
+ path: Path, other: Path, *, walk_up: bool = False
+ ) -> Path:
+ return path.relative_to(other, walk_up=walk_up)
+
+else:
+
+ def path_walk(
+ path: Path, *, top_down: bool = True
+ ) -> Iterator[tuple[Path, list[str], list[str]]]:
+ for root, dirs, files in os.walk(path, topdown=top_down):
+ yield Path(root), dirs, files
+
+ def path_relative_to(
+ path: Path, other: Path, *, walk_up: bool = False
+ ) -> Path:
+ """
+ Calculate the relative path of 'path' with respect to 'other',
+ optionally allowing 'path' to be outside the subtree of 'other'.
+
+ OK I used AI for this, sorry
+
+ """
+ try:
+ return path.relative_to(other)
+ except ValueError:
+ if walk_up:
+ other_ancestors = list(other.parents) + [other]
+ for ancestor in other_ancestors:
+ try:
+ return path.relative_to(ancestor)
+ except ValueError:
+ continue
+ raise ValueError(
+ f"{path} is not in the same subtree as {other}"
+ )
+ else:
+ raise
+
+
def importlib_metadata_get(group: str) -> Sequence[EntryPoint]:
ep = importlib_metadata.entry_points()
if hasattr(ep, "select"):
import importlib.machinery
import importlib.util
import os
+import pathlib
import re
import tempfile
from types import ModuleType
from typing import Any
from typing import Optional
+from typing import Union
from mako import exceptions
from mako.template import Template
def template_to_file(
- template_file: str,
- dest: str,
+ template_file: Union[str, os.PathLike[str]],
+ dest: Union[str, os.PathLike[str]],
output_encoding: str,
*,
append: bool = False,
**kw: Any,
) -> None:
- template = Template(filename=template_file)
+ template = Template(filename=_preserving_path_as_str(template_file))
try:
output = template.render_unicode(**kw).encode(output_encoding)
except:
f.write(output)
-def coerce_resource_to_filename(fname: str) -> str:
+def coerce_resource_to_filename(fname_or_resource: str) -> pathlib.Path:
"""Interpret a filename as either a filesystem location or as a package
resource.
are interpreted as resources and coerced to a file location.
"""
- if not os.path.isabs(fname) and ":" in fname:
- tokens = fname.split(":")
+ # TODO: there seem to be zero tests for the package resource codepath
+ if not os.path.isabs(fname_or_resource) and ":" in fname_or_resource:
+ tokens = fname_or_resource.split(":")
# from https://importlib-resources.readthedocs.io/en/latest/migration.html#pkg-resources-resource-filename # noqa E501
ref = compat.importlib_resources.files(tokens[0])
for tok in tokens[1:]:
ref = ref / tok
- fname = file_manager.enter_context( # type: ignore[assignment]
+ fname_or_resource = file_manager.enter_context( # type: ignore[assignment] # noqa: E501
compat.importlib_resources.as_file(ref)
)
- return fname
+ return pathlib.Path(fname_or_resource)
-def pyc_file_from_path(path: str) -> Optional[str]:
+def pyc_file_from_path(
+ path: Union[str, os.PathLike[str]],
+) -> Optional[pathlib.Path]:
"""Given a python source path, locate the .pyc."""
- candidate = importlib.util.cache_from_source(path)
- if os.path.exists(candidate):
+ pathpath = pathlib.Path(path)
+ candidate = pathlib.Path(
+ importlib.util.cache_from_source(pathpath.as_posix())
+ )
+ if candidate.exists():
return candidate
# even for pep3147, fall back to the old way of finding .pyc files,
# to support sourceless operation
- filepath, ext = os.path.splitext(path)
+ ext = pathpath.suffix
for ext in importlib.machinery.BYTECODE_SUFFIXES:
- if os.path.exists(filepath + ext):
- return filepath + ext
+ if pathpath.with_suffix(ext).exists():
+ return pathpath.with_suffix(ext)
else:
return None
-def load_python_file(dir_: str, filename: str) -> ModuleType:
+def load_python_file(
+ dir_: Union[str, os.PathLike[str]], filename: Union[str, os.PathLike[str]]
+) -> ModuleType:
"""Load a file from the given path as a Python module."""
+ dir_ = pathlib.Path(dir_)
+ filename_as_path = pathlib.Path(filename)
+ filename = filename_as_path.name
+
module_id = re.sub(r"\W", "_", filename)
- path = os.path.join(dir_, filename)
- _, ext = os.path.splitext(filename)
+ path = dir_ / filename
+ ext = path.suffix
if ext == ".py":
- if os.path.exists(path):
+ if path.exists():
module = load_module_py(module_id, path)
else:
pyc_path = pyc_file_from_path(path)
return module
-def load_module_py(module_id: str, path: str) -> ModuleType:
+def load_module_py(
+ module_id: str, path: Union[str, os.PathLike[str]]
+) -> ModuleType:
spec = importlib.util.spec_from_file_location(module_id, path)
assert spec
module = importlib.util.module_from_spec(spec)
return module
-def relpath_via_abs_root(root: str, relative_path: str) -> str:
- abs_root = os.path.abspath(root)
- abs_path = os.path.abspath(relative_path)
- return abs_path[len(os.path.commonpath([abs_root, abs_path])) + 1 :]
+def _preserving_path_as_str(path: Union[str, os.PathLike[str]]) -> str:
+ """receive str/pathlike and return a string.
+
+ Does not convert an incoming string path to a Path first, to help with
+ unit tests that are doing string path round trips without OS-specific
+ processing if not necessary.
+
+ """
+ if isinstance(path, str):
+ return path
+ elif isinstance(path, pathlib.PurePath):
+ return str(path)
+ else:
+ return str(pathlib.Path(path))
--- /dev/null
+.. change::
+ :tags: change, environment
+
+ The command, config and script modules now rely on ``pathlib.Path`` for
+ internal path manipulations, instead of ``os.path()`` operations. Public
+ API functions that accept string directories and filenames continue to do
+ so but also accept ``os.PathLike`` objects. Public API functions and
+ accessors that return paths as strings continue to do so. Private API
+ functions and accessors, i.e. all those that are prefixed with an
+ underscore, may now return a Path object rather than a string to indicate
+ file paths.
from io import StringIO
from io import TextIOWrapper
import os
+import pathlib
import re
from typing import cast
def test_init_file_exists_and_is_empty(self):
def access_(path, mode):
- if "generic" in path or path == "foobar":
+ if "generic" in str(path) or str(path) == "foobar":
return True
else:
return False
def listdir_(path):
- if path == "foobar":
+ if str(path) == "foobar":
return []
else:
return ["file1", "file2", "alembic.ini.mako"]
command.init(self.cfg, directory="foobar")
eq_(
makedirs.mock_calls,
- [mock.call(os.path.normpath("foobar/versions"))],
+ [mock.call(pathlib.Path("foobar/versions"))],
)
def test_init_file_doesnt_exist(self):
def access_(path, mode):
- if "generic" in path:
+ if "generic" in str(path):
return True
else:
return False
eq_(
makedirs.mock_calls,
[
- mock.call("foobar"),
- mock.call(os.path.normpath("foobar/versions")),
+ mock.call(pathlib.Path("foobar")),
+ mock.call(pathlib.Path("foobar/versions")),
],
)
open_.mock_calls,
[
mock.call(
- os.path.abspath(os.path.join(path, "__init__.py")), "w"
+ (pathlib.Path(path, "__init__.py")).absolute(), "w"
),
mock.call().__enter__(),
mock.call().__exit__(None, None, None),
mock.call(
- os.path.abspath(
- os.path.join(path, "versions", "__init__.py")
- ),
+ pathlib.Path(
+ path, "versions", "__init__.py"
+ ).absolute(),
"w",
),
mock.call().__enter__(),
-import os
+from pathlib import Path
import sys
from alembic import command
"arg1",
rev_path,
"--config",
- os.path.abspath(_get_staging_directory()) + "/pyproject.toml",
+ Path(_get_staging_directory(), "pyproject.toml")
+ .absolute()
+ .as_posix(),
]
self._run_black_with_config(
self._run_ruff_with_config(
input_config,
expected_additional_arguments_fn,
- executable=os.path.abspath(_get_staging_directory())
- + "/.venv/bin/ruff",
+ executable=Path(_get_staging_directory(), ".venv/bin/ruff")
+ .absolute()
+ .as_posix(),
)
@combinations(True, False)
"arg1",
rev_path,
"--config",
- os.path.abspath(_get_staging_directory()) + "/pyproject.toml",
+ Path(_get_staging_directory(), "pyproject.toml")
+ .absolute()
+ .as_posix(),
]
self._run_ruff_with_config(
assert_raises_message(
util.CommandError,
- "Could not determine revision id from filename foobar_%s.py. "
+ f"Could not determine revision id from filename foobar_{a}.py. "
"Be sure the 'revision' variable is declared "
- "inside the script." % a,
+ "inside the script.",
Script._from_path,
script,
path,
script._rev_path(
script.versions, "12345", "this is a message", create_date
),
- os.path.abspath(
+ Path(
"%s/versions/12345_this_is_a_"
"message_2012_7_25_15_8_5.py" % _get_staging_directory()
- ),
+ ).absolute(),
)
@testing.combinations(
script._rev_path(
script.versions, "12345", "this is a message", create_date
),
- os.path.abspath(expected % _get_staging_directory()),
+ Path(expected % _get_staging_directory()).absolute(),
)
def _test_tz(self, timezone_arg, given, expected):
def test_multiple_dir_no_bases_invalid_version_path(self):
assert_raises_message(
util.CommandError,
- "Path foo/bar/ is not represented in current version locations",
+ r"Path foo[/\\]bar is not represented in current version "
+ "locations",
command.revision,
self.cfg,
message="x",
script = ScriptDirectory.from_config(config)
- def normpath(path):
- return path.replace(os.pathsep, ":NORM:")
-
- normpath = mock.Mock(side_effect=normpath)
-
- with mock.patch("os.path.normpath", normpath):
- eq_(
- script._version_locations,
- (
- os.path.abspath(
- os.path.join(
- _get_staging_directory(), "scripts", "versions"
- )
- ).replace(os.pathsep, ":NORM:"),
- ),
- )
+ eq_(
+ script._version_locations,
+ [Path(_get_staging_directory(), "scripts", "versions").absolute()],
+ )
- eq_(
- script.versions,
- os.path.abspath(
- os.path.join(
- _get_staging_directory(), "scripts", "versions"
- )
- ).replace(os.pathsep, ":NORM:"),
- )
+ eq_(
+ Path(script.versions),
+ Path(_get_staging_directory(), "scripts", "versions").absolute(),
+ )
def test_script_location_multiple(self):
config = _multi_dir_testing_config()
script = ScriptDirectory.from_config(config)
- def _normpath(path):
- return path.replace(os.pathsep, ":NORM:")
-
- normpath = mock.Mock(side_effect=_normpath)
-
- with mock.patch("os.path.normpath", normpath):
- sd = Path(_get_staging_directory()).as_posix()
- eq_(
- script._version_locations,
- [
- _normpath(os.path.abspath(sd + "/model1/")),
- _normpath(os.path.abspath(sd + "/model2/")),
- _normpath(os.path.abspath(sd + "/model3/")),
- ],
- )
+ sd = Path(_get_staging_directory()).as_posix()
+ eq_(
+ script._version_locations,
+ [
+ Path(sd, "model1").absolute(),
+ Path(sd, "model2").absolute(),
+ Path(sd, "model3").absolute(),
+ ],
+ )
[testenv:pep484]
-basepython = python3
+basepython =
+ python312
+ python313
deps=
mypy
sqlalchemy>=2