except NameError:
basestring = str ## pylint: disable=redefined-builtin
-WIN32 = sys.platform == 'win32'
+WIN32 = sys.platform == "win32"
if WIN32:
PLT_CFG = {
- 'close_fds': False,
+ "close_fds": False,
}
else:
PLT_CFG = {
- 'close_fds': True,
+ "close_fds": True,
}
##
import ctypes
import subprocess
import _subprocess
- from ctypes import byref, windll, c_char_p, c_wchar_p, c_void_p, \
- Structure, sizeof, c_wchar, WinError
- from ctypes.wintypes import BYTE, WORD, LPWSTR, BOOL, DWORD, LPVOID, \
- HANDLE
-
+ from ctypes import (
+ byref,
+ windll,
+ c_char_p,
+ c_wchar_p,
+ c_void_p,
+ Structure,
+ sizeof,
+ c_wchar,
+ WinError,
+ )
+ from ctypes.wintypes import BYTE, WORD, LPWSTR, BOOL, DWORD, LPVOID, HANDLE
##
## Types
LPCTSTR = c_char_p
LPTSTR = c_wchar_p
LPSECURITY_ATTRIBUTES = c_void_p
- LPBYTE = ctypes.POINTER(BYTE)
+ LPBYTE = ctypes.POINTER(BYTE)
class STARTUPINFOW(Structure):
_fields_ = [
- ("cb", DWORD), ("lpReserved", LPWSTR),
- ("lpDesktop", LPWSTR), ("lpTitle", LPWSTR),
- ("dwX", DWORD), ("dwY", DWORD),
- ("dwXSize", DWORD), ("dwYSize", DWORD),
- ("dwXCountChars", DWORD), ("dwYCountChars", DWORD),
- ("dwFillAtrribute", DWORD), ("dwFlags", DWORD),
- ("wShowWindow", WORD), ("cbReserved2", WORD),
- ("lpReserved2", LPBYTE), ("hStdInput", HANDLE),
- ("hStdOutput", HANDLE), ("hStdError", HANDLE),
+ ("cb", DWORD),
+ ("lpReserved", LPWSTR),
+ ("lpDesktop", LPWSTR),
+ ("lpTitle", LPWSTR),
+ ("dwX", DWORD),
+ ("dwY", DWORD),
+ ("dwXSize", DWORD),
+ ("dwYSize", DWORD),
+ ("dwXCountChars", DWORD),
+ ("dwYCountChars", DWORD),
+ ("dwFillAtrribute", DWORD),
+ ("dwFlags", DWORD),
+ ("wShowWindow", WORD),
+ ("cbReserved2", WORD),
+ ("lpReserved2", LPBYTE),
+ ("hStdInput", HANDLE),
+ ("hStdOutput", HANDLE),
+ ("hStdError", HANDLE),
]
LPSTARTUPINFOW = ctypes.POINTER(STARTUPINFOW)
-
class PROCESS_INFORMATION(Structure):
_fields_ = [
- ("hProcess", HANDLE), ("hThread", HANDLE),
- ("dwProcessId", DWORD), ("dwThreadId", DWORD),
+ ("hProcess", HANDLE),
+ ("hThread", HANDLE),
+ ("dwProcessId", DWORD),
+ ("dwThreadId", DWORD),
]
LPPROCESS_INFORMATION = ctypes.POINTER(PROCESS_INFORMATION)
-
class DUMMY_HANDLE(ctypes.c_void_p):
def __init__(self, *a, **kw):
def __int__(self):
return self.value
-
CreateProcessW = windll.kernel32.CreateProcessW
CreateProcessW.argtypes = [
- LPCTSTR, LPTSTR, LPSECURITY_ATTRIBUTES,
- LPSECURITY_ATTRIBUTES, BOOL, DWORD, LPVOID, LPCTSTR,
- LPSTARTUPINFOW, LPPROCESS_INFORMATION,
+ LPCTSTR,
+ LPTSTR,
+ LPSECURITY_ATTRIBUTES,
+ LPSECURITY_ATTRIBUTES,
+ BOOL,
+ DWORD,
+ LPVOID,
+ LPCTSTR,
+ LPSTARTUPINFOW,
+ LPPROCESS_INFORMATION,
]
CreateProcessW.restype = BOOL
-
##
## Patched functions/classes
##
- def CreateProcess(executable, args, _p_attr, _t_attr,
- inherit_handles, creation_flags, env, cwd,
- startup_info):
+ def CreateProcess(
+ executable,
+ args,
+ _p_attr,
+ _t_attr,
+ inherit_handles,
+ creation_flags,
+ env,
+ cwd,
+ startup_info,
+ ):
"""Create a process supporting unicode executable and args for win32
Python implementation of CreateProcess using CreateProcessW for Win32
wenv = None
if env is not None:
## LPCWSTR seems to be c_wchar_p, so let's say CWSTR is c_wchar
- env = (unicode("").join([
- unicode("%s=%s\0") % (k, v)
- for k, v in env.items()])) + unicode("\0")
+ env = (
+ unicode("").join([unicode("%s=%s\0") % (k, v) for k, v in env.items()])
+ ) + unicode("\0")
wenv = (c_wchar * len(env))()
wenv.value = env
pi = PROCESS_INFORMATION()
creation_flags |= CREATE_UNICODE_ENVIRONMENT
- if CreateProcessW(executable, args, None, None,
- inherit_handles, creation_flags,
- wenv, cwd, byref(si), byref(pi)):
- return (DUMMY_HANDLE(pi.hProcess), DUMMY_HANDLE(pi.hThread),
- pi.dwProcessId, pi.dwThreadId)
+ if CreateProcessW(
+ executable,
+ args,
+ None,
+ None,
+ inherit_handles,
+ creation_flags,
+ wenv,
+ cwd,
+ byref(si),
+ byref(pi),
+ ):
+ return (
+ DUMMY_HANDLE(pi.hProcess),
+ DUMMY_HANDLE(pi.hThread),
+ pi.dwProcessId,
+ pi.dwThreadId,
+ )
raise WinError()
-
class Popen(subprocess.Popen):
"""This superseeds Popen and corrects a bug in cPython 2.7 implem"""
- def _execute_child(self, args, executable, preexec_fn, close_fds,
- cwd, env, universal_newlines,
- startupinfo, creationflags, shell, to_close,
- p2cread, p2cwrite,
- c2pread, c2pwrite,
- errread, errwrite):
+ def _execute_child(
+ self,
+ args,
+ executable,
+ preexec_fn,
+ close_fds,
+ cwd,
+ env,
+ universal_newlines,
+ startupinfo,
+ creationflags,
+ shell,
+ to_close,
+ p2cread,
+ p2cwrite,
+ c2pread,
+ c2pwrite,
+ errread,
+ errwrite,
+ ):
"""Code from part of _execute_child from Python 2.7 (9fbb65e)
There are only 2 little changes concerning the construction of
startupinfo.wShowWindow = _subprocess.SW_HIDE
comspec = os.environ.get("COMSPEC", unicode("cmd.exe"))
args = unicode('{} /c "{}"').format(comspec, args)
- if (_subprocess.GetVersion() >= 0x80000000 or
- os.path.basename(comspec).lower() == "command.com"):
+ if (
+ _subprocess.GetVersion() >= 0x80000000
+ or os.path.basename(comspec).lower() == "command.com"
+ ):
w9xpopen = self._find_w9xpopen()
args = unicode('"%s" %s') % (w9xpopen, args)
creationflags |= _subprocess.CREATE_NEW_CONSOLE
- super(Popen, self)._execute_child(args, executable,
- preexec_fn, close_fds, cwd, env, universal_newlines,
- startupinfo, creationflags, False, to_close, p2cread,
- p2cwrite, c2pread, c2pwrite, errread, errwrite)
+ super(Popen, self)._execute_child(
+ args,
+ executable,
+ preexec_fn,
+ close_fds,
+ cwd,
+ env,
+ universal_newlines,
+ startupinfo,
+ creationflags,
+ False,
+ to_close,
+ p2cread,
+ p2cwrite,
+ c2pread,
+ c2pwrite,
+ errread,
+ errwrite,
+ )
_subprocess.CreateProcess = CreateProcess
## Shell command helper functions
##
+
def stderr(msg):
print(msg, file=sys.stderr)
"""
- return '\n'.join(
- str(prefix + line)
- for line in traceback.format_exc().strip().split('\n'))
+ return "\n".join(
+ str(prefix + line) for line in traceback.format_exc().strip().split("\n")
+ )
##
##
_config_env = {
- 'WIN32': WIN32,
- 'PY3': PY3,
+ "WIN32": WIN32,
+ "PY3": PY3,
}
return f
-def load_config_file(filename, default_filename=None,
- fail_if_not_present=True):
+def load_config_file(filename, default_filename=None, fail_if_not_present=True):
"""Loads data from a config file."""
config = _config_env.copy()
for fname in [default_filename, filename]:
if fname and os.path.exists(fname):
if not os.path.isfile(fname):
- die("config file path '%s' exists but is not a file !"
- % (fname, ))
+ die("config file path '%s' exists but is not a file !" % (fname,))
content = file_get_contents(fname)
try:
- code = compile(content, fname, 'exec')
+ code = compile(content, fname, "exec")
exec(code, config) ## pylint: disable=exec-used
except SyntaxError as e:
- die('Syntax error in config file: %s\n%s'
- 'File %s, line %i'
- % (str(e),
- (indent(e.text.rstrip(), " | ") + "\n") if e.text else "",
- e.filename, e.lineno))
+ die(
+ "Syntax error in config file: %s\n%s"
+ "File %s, line %i"
+ % (
+ str(e),
+ (indent(e.text.rstrip(), " | ") + "\n") if e.text else "",
+ e.filename,
+ e.lineno,
+ )
+ )
else:
if fail_if_not_present:
- die('%s config file is not found and is required.' % (fname, ))
+ die("%s config file is not found and is required." % (fname,))
return config
## Text functions
##
+
@available_in_config
class TextProc(object):
if isinstance(value, TextProc):
return TextProc(lambda text: value.fun(self.fun(text)))
import inspect
- (_frame, filename, lineno, _function_name, lines, _index) = \
- inspect.stack()[1]
- raise SyntaxError("Invalid syntax in config file",
- (filename, lineno, 0,
- "Invalid chain with a non TextProc element %r:\n%s"
- % (value, indent("".join(lines).strip(), " | "))))
+
+ (_frame, filename, lineno, _function_name, lines, _index) = inspect.stack()[1]
+ raise SyntaxError(
+ "Invalid syntax in config file",
+ (
+ filename,
+ lineno,
+ 0,
+ "Invalid chain with a non TextProc element %r:\n%s"
+ % (value, indent("".join(lines).strip(), " | ")),
+ ),
+ )
def set_if_empty(text, msg="No commit message."):
"""
if first:
first_line = text.split("\n")[0]
- rest = '\n'.join(text.split("\n")[1:])
- return '\n'.join([(first + first_line).rstrip(),
- indent(rest, chars=chars)])
- return '\n'.join([(chars + line).rstrip()
- for line in text.split('\n')])
+ rest = "\n".join(text.split("\n")[1:])
+ return "\n".join([(first + first_line).rstrip(), indent(rest, chars=chars)])
+ return "\n".join([(chars + line).rstrip() for line in text.split("\n")])
def paragraph_wrap(text, regexp="\n\n"):
"""
regexp = re.compile(regexp, re.MULTILINE)
- return "\n".join("\n".join(textwrap.wrap(paragraph.strip()))
- for paragraph in regexp.split(text)).strip()
+ return "\n".join(
+ "\n".join(textwrap.wrap(paragraph.strip())) for paragraph in regexp.split(text)
+ ).strip()
def curryfy(f):
return lambda *a, **kw: TextProc(lambda txt: f(txt, *a, **kw))
+
## these are curryfied version of their lower case definition
Indent = curryfy(indent)
strip = TextProc(lambda txt: txt.strip())
SetIfEmpty = curryfy(set_if_empty)
-for _label in ("Indent", "Wrap", "ReSub", "noop", "final_dot",
- "ucfirst", "strip", "SetIfEmpty"):
+for _label in (
+ "Indent",
+ "Wrap",
+ "ReSub",
+ "noop",
+ "final_dot",
+ "ucfirst",
+ "strip",
+ "SetIfEmpty",
+):
_config_env[_label] = locals()[_label]
##
## File
##
+
def file_get_contents(filename):
with open(filename) as f:
out = f.read()
out = out.decode(_preferred_encoding)
## remove encoding declaration (for some reason, python 2.7
## don't like it).
- out = re.sub(r"^(\s*#.*\s*)coding[:=]\s*([-\w.]+\s*;?\s*)",
- r"\1", out, re.DOTALL)
+ out = re.sub(
+ r"^(\s*#.*\s*)coding[:=]\s*([-\w.]+\s*;?\s*)", r"\1", out, re.DOTALL
+ )
return out
def file_put_contents(filename, string):
"""Write string to filename."""
if PY3:
- fopen = open(filename, 'w', newline='')
+ fopen = open(filename, "w", newline="")
else:
- fopen = open(filename, 'wb')
+ fopen = open(filename, "wb")
with fopen as f:
f.write(string)
## Inferring revision
##
+
def _file_regex_match(filename, pattern, **kw):
if not os.path.isfile(filename):
raise IOError("Can't open file '%s'." % filename)
match = re.search(pattern, file_content, **kw)
if match is None:
stderr("file content: %r" % file_content)
- if isinstance(pattern, type(re.compile(''))):
+ if isinstance(pattern, type(re.compile(""))):
pattern = pattern.pattern
raise ValueError(
- "Regex %s did not match any substring in '%s'."
- % (pattern, filename))
+ "Regex %s did not match any substring in '%s'." % (pattern, filename)
+ )
return match
dct = match.groupdict()
if dct:
if "rev" not in dct:
- warn("Named pattern used, but no one are named 'rev'. "
- "Using full match.")
+ warn(
+ "Named pattern used, but no one are named 'rev'. "
+ "Using full match."
+ )
return match.group(0)
- if dct['rev'] is None:
+ if dct["rev"] is None:
die("Named pattern used, but it was not valued.")
- return dct['rev']
+ return dct["rev"]
return match.group(0)
+
return _call
def Caret(l):
def _call():
return "^%s" % eval_if_callable(l)
+
return _call
+
+
##
## System functions
##
## PY2, ``sys.stdout.encoding`` without PYTHONIOENCODING set does not
## get any values set in subshells. However, if _preferred_encoding
## is not set to utf-8, it leads to encoding errors.
-_preferred_encoding = os.environ.get("PYTHONIOENCODING") or \
- locale.getpreferredencoding()
-DEFAULT_GIT_LOG_ENCODING = 'utf-8'
+_preferred_encoding = (
+ os.environ.get("PYTHONIOENCODING") or locale.getpreferredencoding()
+)
+DEFAULT_GIT_LOG_ENCODING = "utf-8"
class Phile(object):
def __init__(self, command, env=None, encoding=_preferred_encoding):
super(Proc, self).__init__(
- command, shell=True,
- stdin=PIPE, stdout=PIPE, stderr=PIPE,
- close_fds=PLT_CFG['close_fds'], env=env,
- universal_newlines=False)
+ command,
+ shell=True,
+ stdin=PIPE,
+ stdout=PIPE,
+ stderr=PIPE,
+ close_fds=PLT_CFG["close_fds"],
+ env=env,
+ universal_newlines=False,
+ )
self.stdin = Phile(self.stdin, encoding=encoding)
self.stdout = Phile(self.stdout, encoding=encoding)
def cmd(command, env=None, shell=True):
- p = Popen(command, shell=shell,
- stdin=PIPE, stdout=PIPE, stderr=PIPE,
- close_fds=PLT_CFG['close_fds'], env=env,
- universal_newlines=False)
+ p = Popen(
+ command,
+ shell=shell,
+ stdin=PIPE,
+ stdout=PIPE,
+ stderr=PIPE,
+ close_fds=PLT_CFG["close_fds"],
+ env=env,
+ universal_newlines=False,
+ )
out, err = p.communicate()
return (
- out.decode(getattr(sys.stdout, "encoding", None) or
- _preferred_encoding),
- err.decode(getattr(sys.stderr, "encoding", None) or
- _preferred_encoding),
- p.returncode)
+ out.decode(getattr(sys.stdout, "encoding", None) or _preferred_encoding),
+ err.decode(getattr(sys.stderr, "encoding", None) or _preferred_encoding),
+ p.returncode,
+ )
@available_in_config
formatted = []
if out:
- if out.endswith('\n'):
+ if out.endswith("\n"):
out = out[:-1]
formatted.append("stdout:\n%s" % indent(out, "| "))
if err:
- if err.endswith('\n'):
+ if err.endswith("\n"):
err = err[:-1]
formatted.append("stderr:\n%s" % indent(err, "| "))
- msg = '\n'.join(formatted)
-
- raise ShellError("Wrapped command %r exited with errorlevel %d.\n%s"
- % (command, errlvl, indent(msg, chars=" ")),
- errlvl=errlvl, command=command, out=out, err=err)
+ msg = "\n".join(formatted)
+
+ raise ShellError(
+ "Wrapped command %r exited with errorlevel %d.\n%s"
+ % (command, errlvl, indent(msg, chars=" ")),
+ errlvl=errlvl,
+ command=command,
+ out=out,
+ err=err,
+ )
return out
## git information access
##
+
class SubGitObjectMixin(object):
def __init__(self, repos):
GIT_FORMAT_KEYS = {
- 'sha1': "%H",
- 'sha1_short': "%h",
- 'subject': "%s",
- 'author_name': "%an",
- 'author_email': "%ae",
- 'author_date': "%ad",
- 'author_date_timestamp': "%at",
- 'committer_name': "%cn",
- 'committer_date_timestamp': "%ct",
- 'raw_body': "%B",
- 'body': "%b",
+ "sha1": "%H",
+ "sha1_short": "%h",
+ "subject": "%s",
+ "author_name": "%an",
+ "author_email": "%ae",
+ "author_date": "%ad",
+ "author_date_timestamp": "%at",
+ "committer_name": "%cn",
+ "committer_date_timestamp": "%ct",
+ "raw_body": "%B",
+ "body": "%b",
}
GIT_FULL_FORMAT_STRING = "%x00".join(GIT_FORMAT_KEYS.values())
-REGEX_RFC822_KEY_VALUE = \
- r'(^|\n)(?P<key>[A-Z]\w+(-\w+)*): (?P<value>[^\n]*(\n\s+[^\n]*)*)'
-REGEX_RFC822_POSTFIX = \
- r'(%s)+$' % REGEX_RFC822_KEY_VALUE
+REGEX_RFC822_KEY_VALUE = (
+ r"(^|\n)(?P<key>[A-Z]\w+(-\w+)*): (?P<value>[^\n]*(\n\s+[^\n]*)*)"
+)
+REGEX_RFC822_POSTFIX = r"(%s)+$" % REGEX_RFC822_KEY_VALUE
class GitCommit(SubGitObjectMixin):
missing_attrs = [l for l in attrs if l not in self.__dict__]
## some commit can be already fully specified (see ``mk_commit``)
if missing_attrs:
- aformat = "%x00".join(GIT_FORMAT_KEYS[l]
- for l in missing_attrs)
+ aformat = "%x00".join(GIT_FORMAT_KEYS[l] for l in missing_attrs)
try:
- ret = self.git.log([identifier, "--max-count=1",
- "--pretty=format:%s" % aformat, "--"])
+ ret = self.git.log(
+ [identifier, "--max-count=1", "--pretty=format:%s" % aformat, "--"]
+ )
except ShellError:
if DEBUG:
raise
- raise ValueError("Given commit identifier %r doesn't exists"
- % self.identifier)
+ raise ValueError(
+ "Given commit identifier %r doesn't exists" % self.identifier
+ )
attr_values = ret.split("\x00")
for attr, value in zip(missing_attrs, attr_values):
setattr(self, attr, value.strip())
dct = match.groupdict()
key = dct["key"].replace("-", "_").lower()
if "\n" in dct["value"]:
- first_line, remaining = dct["value"].split('\n', 1)
- value = "%s\n%s" % (first_line,
- textwrap.dedent(remaining))
+ first_line, remaining = dct["value"].split("\n", 1)
+ value = "%s\n%s" % (first_line, textwrap.dedent(remaining))
else:
value = dct["value"]
try:
except KeyError:
setattr(self, "trailer_%s" % key, value)
else:
- setattr(self, "trailer_%s" % key,
- prev_value + [value, ]
+ setattr(
+ self,
+ "trailer_%s" % key,
+ (
+ prev_value
+ + [
+ value,
+ ]
if isinstance(prev_value, list)
- else [prev_value, value, ])
+ else [
+ prev_value,
+ value,
+ ]
+ ),
+ )
self._trailer_parsed = True
return getattr(self, label)
@property
def author_names(self):
- return [re.sub(r'^([^<]+)<[^>]+>\s*$', r'\1', author).strip()
- for author in self.authors]
+ return [
+ re.sub(r"^([^<]+)<[^>]+>\s*$", r"\1", author).strip()
+ for author in self.authors
+ ]
@property
def authors(self):
- co_authors = getattr(self, 'trailer_co_authored_by', [])
- co_authors = co_authors if isinstance(co_authors, list) \
- else [co_authors]
- return sorted(co_authors +
- ["%s <%s>" % (self.author_name, self.author_email)])
+ co_authors = getattr(self, "trailer_co_authored_by", [])
+ co_authors = co_authors if isinstance(co_authors, list) else [co_authors]
+ return sorted(co_authors + ["%s <%s>" % (self.author_name, self.author_email)])
@property
def date(self):
- d = datetime.datetime.utcfromtimestamp(
- float(self.author_date_timestamp))
- return d.strftime('%Y-%m-%d')
+ d = datetime.datetime.utcfromtimestamp(float(self.author_date_timestamp))
+ return d.strftime("%Y-%m-%d")
@property
def has_annotated_tag(self):
try:
- self.git.rev_parse(['%s^{tag}' % self.identifier, "--"])
+ self.git.rev_parse(["%s^{tag}" % self.identifier, "--"])
return True
except ShellError as e:
if e.errlvl != 128:
@property
def tagger_date_timestamp(self):
if not self.has_annotated_tag:
- raise ValueError("Can't access 'tagger_date_timestamp' on commit without annotated tag.")
+ raise ValueError(
+ "Can't access 'tagger_date_timestamp' on commit without annotated tag."
+ )
tagger_date_utc = self.git.for_each_ref(
- 'refs/tags/%s' % self.identifier, format='%(taggerdate:raw)')
+ "refs/tags/%s" % self.identifier, format="%(taggerdate:raw)"
+ )
return tagger_date_utc.split(" ", 1)[0]
@property
def tagger_date(self):
- d = datetime.datetime.utcfromtimestamp(
- float(self.tagger_date_timestamp))
- return d.strftime('%Y-%m-%d')
+ d = datetime.datetime.utcfromtimestamp(float(self.tagger_date_timestamp))
+ return d.strftime("%Y-%m-%d")
def __le__(self, value):
if not isinstance(value, GitCommit):
res = self.git.config(label)
except ShellError as e:
if e.errlvl == 1 and e.out == "":
- raise AttributeError("key %r is not found in git config."
- % label)
+ raise AttributeError("key %r is not found in git config." % label)
raise
return res
return swrap(command, **kwargs)
def method(*args, **kwargs):
- if (len(args) == 1 and not isinstance(args[0], basestring)):
+ if len(args) == 1 and not isinstance(args[0], basestring):
return dir_swrap(
- ['git', label, ] + args[0],
+ [
+ "git",
+ label,
+ ]
+ + args[0],
shell=False,
- env=kwargs.get("env", None))
+ env=kwargs.get("env", None),
+ )
cli_args = []
for key, value in kwargs.items():
- cli_key = (("-%s" if len(key) == 1 else "--%s")
- % key.replace("_", "-"))
+ cli_key = ("-%s" if len(key) == 1 else "--%s") % key.replace("_", "-")
if isinstance(value, bool):
cli_args.append(cli_key)
else:
cli_args.extend(args)
- return dir_swrap(['git', label, ] + cli_args, shell=False)
+ return dir_swrap(
+ [
+ "git",
+ label,
+ ]
+ + cli_args,
+ shell=False,
+ )
+
return method
raise
raise EnvironmentError(
"Required ``git`` command not found or broken in $PATH. "
- "(calling ``git version`` failed.)")
+ "(calling ``git version`` failed.)"
+ )
## verify that we are in a git repository
try:
if DEBUG:
raise
raise EnvironmentError(
- "Not in a git repository. (calling ``git remote`` failed.)")
+ "Not in a git repository. (calling ``git remote`` failed.)"
+ )
self.bare = self.git.rev_parse(is_bare_repository=True) == "true"
- self.toplevel = (None if self.bare else
- self.git.rev_parse(show_toplevel=True))
- self.gitdir = normpath(self.git.rev_parse(git_dir=True),
- cwd=self._orig_path)
+ self.toplevel = None if self.bare else self.git.rev_parse(show_toplevel=True)
+ self.gitdir = normpath(self.git.rev_parse(git_dir=True), cwd=self._orig_path)
@classmethod
def create(cls, directory, *args, **kwargs):
## ``git tags --sort -v:refname`` in git version >2.0.
## Sorting and reversing with command line is not available on
## git version <2.0
- return sorted([self.commit(tag) for tag in tags if tag != ''],
- key=lambda x: int(x.committer_date_timestamp))
+ return sorted(
+ [self.commit(tag) for tag in tags if tag != ""],
+ key=lambda x: int(x.committer_date_timestamp),
+ )
- def log(self, includes=["HEAD", ], excludes=[], include_merge=True,
- encoding=_preferred_encoding):
+ def log(
+ self,
+ includes=[
+ "HEAD",
+ ],
+ excludes=[],
+ include_merge=True,
+ encoding=_preferred_encoding,
+ ):
"""Reverse chronological list of git repository's commits
Note: rev lists can be GitCommit instance list or identifier list.
"""
- refs = {'includes': includes,
- 'excludes': excludes}
- for ref_type in ('includes', 'excludes'):
+ refs = {"includes": includes, "excludes": excludes}
+ for ref_type in ("includes", "excludes"):
for idx, ref in enumerate(refs[ref_type]):
if not isinstance(ref, GitCommit):
refs[ref_type][idx] = self.commit(ref)
## --topo-order: don't mix commits from separate branches.
- plog = Proc("git log --stdin -z --topo-order --pretty=format:%s %s --"
- % (GIT_FULL_FORMAT_STRING,
- '--no-merges' if not include_merge else ''),
- encoding=encoding)
+ plog = Proc(
+ "git log --stdin -z --topo-order --pretty=format:%s %s --"
+ % (GIT_FULL_FORMAT_STRING, "--no-merges" if not include_merge else ""),
+ encoding=encoding,
+ )
for ref in refs["includes"]:
plog.stdin.write("%s\n" % ref.sha1)
try:
while True: ## next(values) will eventualy raise a StopIteration
- yield mk_commit(dict([(key, next(values))
- for key in GIT_FORMAT_KEYS]))
+ yield mk_commit(dict([(key, next(values)) for key in GIT_FORMAT_KEYS]))
except StopIteration:
pass ## since 3.7, we are not allowed anymore to trickle down
- ## StopIteration.
+ ## StopIteration.
finally:
plog.stdout.close()
plog.stderr.close()
"""
try:
- template_path = GitRepos(os.getcwd()).config.get(
- "gitchangelog.template-path")
+ template_path = GitRepos(os.getcwd()).config.get("gitchangelog.template-path")
except ShellError as e:
stderr(
"Error parsing git config: %s."
- " Won't be able to read 'template-path' if defined."
- % (str(e)))
+ " Won't be able to read 'template-path' if defined." % (str(e))
+ )
template_path = None
if template_path:
path_file = path_label = template_path
else:
path_file = os.getcwd()
- path_label = os.path.join(os.path.dirname(os.path.realpath(__file__)),
- "templates", label)
+ path_label = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), "templates", label
+ )
- for ftn in [os.path.join(path_file, template_name),
- os.path.join(path_label, "%s.tpl" % template_name)]:
+ for ftn in [
+ os.path.join(path_file, template_name),
+ os.path.join(path_label, "%s.tpl" % template_name),
+ ]:
if os.path.isfile(ftn):
return ftn
templates = glob.glob(os.path.join(path_label, "*.tpl"))
if len(templates) > 0:
- msg = ("These are the available %s templates:" % label)
- msg += "\n - " + \
- "\n - ".join(os.path.basename(f).split(".")[0]
- for f in templates)
+ msg = "These are the available %s templates:" % label
+ msg += "\n - " + "\n - ".join(
+ os.path.basename(f).split(".")[0] for f in templates
+ )
msg += "\nTemplates are located in %r" % path_label
else:
- msg = "No available %s templates found in %r." \
- % (label, path_label)
- die("Error: Invalid %s template name %r.\n" % (label, template_name) +
- "%s" % msg)
+ msg = "No available %s templates found in %r." % (label, path_label)
+ die("Error: Invalid %s template name %r.\n" % (label, template_name) + "%s" % msg)
##
## Output Engines
##
+
@available_in_config
def rest_py(data, opts={}):
"""Returns ReStructured Text changelog content from data"""
return (label.strip() + "\n") + (char * len(label) + "\n")
def render_version(version):
- title = "%s (%s)" % (version["tag"], version["date"]) \
- if version["tag"] else \
- opts["unreleased_version_label"]
+ title = (
+ "%s (%s)" % (version["tag"], version["date"])
+ if version["tag"]
+ else opts["unreleased_version_label"]
+ )
s = rest_title(title, char="-")
sections = version["sections"]
nb_sections = len(sections)
for section in sections:
- section_label = section["label"] if section.get("label", None) \
- else "Other"
+ section_label = section["label"] if section.get("label", None) else "Other"
if not (section_label == "Other" and nb_sections == 1):
s += "\n" + rest_title(section_label, "~")
def render_commit(commit, opts=opts):
subject = commit["subject"]
- subject += " [%s]" % (", ".join(commit["authors"]), )
+ subject += " [%s]" % (", ".join(commit["authors"]),)
- entry = indent('\n'.join(textwrap.wrap(subject)),
- first="- ").strip() + "\n"
+ entry = indent("\n".join(textwrap.wrap(subject)), first="- ").strip() + "\n"
if commit["body"]:
entry += "\n" + indent(commit["body"])
def stuffed_versions(versions, opts):
for version in versions:
- title = "%s (%s)" % (version["tag"], version["date"]) \
- if version["tag"] else \
- opts["unreleased_version_label"]
+ title = (
+ "%s (%s)" % (version["tag"], version["date"])
+ if version["tag"]
+ else opts["unreleased_version_label"]
+ )
version["label"] = title
version["label_chars"] = list(version["label"])
for section in version["sections"]:
section["label_chars"] = list(section["label"])
- section["display_label"] = \
- not (section["label"] == "Other" and
- len(version["sections"]) == 1)
+ section["display_label"] = not (
+ section["label"] == "Other" and len(version["sections"]) == 1
+ )
for commit in section["commits"]:
- commit["author_names_joined"] = ", ".join(
- commit["authors"])
+ commit["author_names_joined"] = ", ".join(commit["authors"])
commit["body_indented"] = indent(commit["body"])
yield version
if mako:
- import mako.template ## pylint: disable=wrong-import-position
+ import mako.template ## pylint: disable=wrong-import-position
- mako_env = dict((f.__name__, f) for f in (ucfirst, indent, textwrap,
- paragraph_wrap))
+ mako_env = dict(
+ (f.__name__, f) for f in (ucfirst, indent, textwrap, paragraph_wrap)
+ )
@available_in_config
def makotemplate(template_name):
def renderer(data, opts):
kwargs = mako_env.copy()
- kwargs.update({"data": data,
- "opts": opts})
+ kwargs.update({"data": data, "opts": opts})
return template.render(**kwargs)
return renderer
## Publish action
##
+
@available_in_config
def stdout(content):
for chunk in content:
safe_print(chunk)
+
+
@available_in_config
-def FileInsertAtFirstRegexMatch(filename, pattern, flags=0,
- idx=lambda m: m.start()):
+def FileInsertAtFirstRegexMatch(filename, pattern, flags=0, idx=lambda m: m.start()):
def write_content(f, content):
for content_line in content:
offset = new_offset
dst.write(line)
continue
- dst.write(line[0:index - offset])
+ dst.write(line[0 : index - offset])
write_content(dst, content)
- dst.write(line[index - offset:])
+ dst.write(line[index - offset :])
postfix = True
if not postfix:
write_content(dst, content)
@available_in_config
def FileRegexSubst(filename, pattern, replace, flags=0):
- replace = re.sub(r'\\([0-9+])', r'\\g<\1>', replace)
+ replace = re.sub(r"\\([0-9+])", r"\\g<\1>", replace)
def _wrapped(content):
src = file_get_contents(filename)
## Protect replacement pattern against the following expansion of '\o'
src = re.sub(
- pattern,
- replace.replace(r'\o', "".join(content).replace('\\', '\\\\')),
- src, flags=flags)
+ pattern,
+ replace.replace(r"\o", "".join(content).replace("\\", "\\\\")),
+ src,
+ flags=flags,
+ )
if not PY3:
src = src.encode(_preferred_encoding)
file_put_contents(filename, src)
## Data Structure
##
-def versions_data_iter(repository, revlist=None,
- ignore_regexps=[],
- section_regexps=[(None, '')],
- tag_filter_regexp=r"\d+\.\d+(\.\d+)?",
- include_merge=True,
- body_process=lambda x: x,
- subject_process=lambda x: x,
- log_encoding=DEFAULT_GIT_LOG_ENCODING,
- warn=warn, ## Mostly used for test
- ):
+
+def versions_data_iter(
+ repository,
+ revlist=None,
+ ignore_regexps=[],
+ section_regexps=[(None, "")],
+ tag_filter_regexp=r"\d+\.\d+(\.\d+)?",
+ include_merge=True,
+ body_process=lambda x: x,
+ subject_process=lambda x: x,
+ log_encoding=DEFAULT_GIT_LOG_ENCODING,
+ warn=warn, ## Mostly used for test
+):
"""Returns an iterator through versions data structures
(see ``gitchangelog.rc.reference`` file for more info)
## Hash to speedup lookups
versions_done = {}
- excludes = [rev[1:]
- for rev in repository.git.rev_parse([
- "--rev-only", ] + revlist + ["--", ]).split("\n")
- if rev.startswith("^")] if revlist else []
+ excludes = (
+ [
+ rev[1:]
+ for rev in repository.git.rev_parse(
+ [
+ "--rev-only",
+ ]
+ + revlist
+ + [
+ "--",
+ ]
+ ).split("\n")
+ if rev.startswith("^")
+ ]
+ if revlist
+ else []
+ )
revs = repository.git.rev_list(*revlist).split("\n") if revlist else []
revs = [rev for rev in revs if rev != ""]
if revlist and not revs:
- die("No commits matching given revlist: %s" % (" ".join(revlist), ))
+ die("No commits matching given revlist: %s" % (" ".join(revlist),))
- tags = [tag
- for tag in repository.tags(contains=revs[-1] if revs else None)
- if re.match(tag_filter_regexp, tag.identifier)]
+ tags = [
+ tag
+ for tag in repository.tags(contains=revs[-1] if revs else None)
+ if re.match(tag_filter_regexp, tag.identifier)
+ ]
tags.append(repository.commit("HEAD"))
sections = collections.defaultdict(list)
commits = repository.log(
includes=[min(tag, max_rev)],
- excludes=tags[idx + 1:] + excludes,
+ excludes=tags[idx + 1 :] + excludes,
include_merge=include_merge,
- encoding=log_encoding)
+ encoding=log_encoding,
+ )
for commit in commits:
- if any(re.search(pattern, commit.subject) is not None
- for pattern in ignore_regexps):
+ if any(
+ re.search(pattern, commit.subject) is not None
+ for pattern in ignore_regexps
+ ):
continue
matched_section = first_matching(section_regexps, commit.subject)
## Finally storing the commit in the matching section
- sections[matched_section].append({
- "author": commit.author_name,
- "authors": commit.author_names,
- "subject": subject_process(commit.subject),
- "body": body_process(commit.body),
- "commit": commit,
- })
+ sections[matched_section].append(
+ {
+ "author": commit.author_name,
+ "authors": commit.author_names,
+ "subject": subject_process(commit.subject),
+ "body": body_process(commit.body),
+ "commit": commit,
+ }
+ )
## Flush current version
- current_version["sections"] = [{"label": k, "commits": sections[k]}
- for k in section_order
- if k in sections]
+ current_version["sections"] = [
+ {"label": k, "commits": sections[k]} for k in section_order if k in sections
+ ]
if len(current_version["sections"]) != 0:
yield current_version
versions_done[tag] = current_version
-def changelog(output_engine=rest_py,
- unreleased_version_label="unreleased",
- warn=warn, ## Mostly used for test
- **kwargs):
+def changelog(
+ output_engine=rest_py,
+ unreleased_version_label="unreleased",
+ warn=warn, ## Mostly used for test
+ **kwargs
+):
"""Returns a string containing the changelog of given repository
This function returns a string corresponding to the template rendered with
"""
opts = {
- 'unreleased_version_label': unreleased_version_label,
+ "unreleased_version_label": unreleased_version_label,
}
## Setting main container of changelog elements
title = None if kwargs.get("revlist") else "Changelog"
- data = {"title": title,
- "versions": []}
+ data = {"title": title, "versions": []}
versions = versions_data_iter(warn=warn, **kwargs)
return output_engine(data=data, opts=opts)
+
##
## Manage obsolete options
##
"""
if "replace_regexps" in config:
for pattern, replace in config["replace_regexps"].items():
- config["subject_process"] = \
- ReSub(pattern, replace) | \
- config.get("subject_process", ucfirst | final_dot)
+ config["subject_process"] = ReSub(pattern, replace) | config.get(
+ "subject_process", ucfirst | final_dot
+ )
@obsolete_option_manager
"""
if "body_split_regex" in config:
- config["body_process"] = Wrap(config["body_split_regex"]) | \
- config.get("body_process", noop)
+ config["body_process"] = Wrap(config["body_split_regex"]) | config.get(
+ "body_process", noop
+ )
def manage_obsolete_options(config):
## Command line parsing
##
+
def parse_cmd_line(usage, description, epilog, exname, version):
import argparse
- kwargs = dict(usage=usage,
- description=description,
- epilog="\n" + epilog,
- prog=exname,
- formatter_class=argparse.RawTextHelpFormatter)
+
+ kwargs = dict(
+ usage=usage,
+ description=description,
+ epilog="\n" + epilog,
+ prog=exname,
+ formatter_class=argparse.RawTextHelpFormatter,
+ )
try:
parser = argparse.ArgumentParser(version=version, **kwargs)
except TypeError: ## compat with argparse from python 3.4
parser = argparse.ArgumentParser(**kwargs)
- parser.add_argument('-v', '--version',
- help="show program's version number and exit",
- action="version", version=version)
+ parser.add_argument(
+ "-v",
+ "--version",
+ help="show program's version number and exit",
+ action="version",
+ version=version,
+ )
- parser.add_argument('-d', '--debug',
- help="Enable debug mode (show full tracebacks).",
- action="store_true", dest="debug")
- parser.add_argument('revlist', nargs='*', action="store", default=[])
+ parser.add_argument(
+ "-d",
+ "--debug",
+ help="Enable debug mode (show full tracebacks).",
+ action="store_true",
+ dest="debug",
+ )
+ parser.add_argument("revlist", nargs="*", action="store", default=[])
## Remove "show" as first argument for compatibility reason.
continue
if arg == "show":
warn("'show' positional argument is deprecated.")
- argv += sys.argv[i + 2:]
+ argv += sys.argv[i + 2 :]
break
else:
- argv += sys.argv[i + 1:]
+ argv += sys.argv[i + 1 :]
break
return parser.parse_args(argv)
if revs:
revs = eval_if_callable(revs)
if not isinstance(revs, list):
- die("Invalid type for 'revs' in config file. "
+ die(
+ "Invalid type for 'revs' in config file. "
"A 'list' type is required, and a %r was given."
- % type(revs).__name__)
- revs = [eval_if_callable(rev)
- for rev in revs]
+ % type(revs).__name__
+ )
+ revs = [eval_if_callable(rev) for rev in revs]
else:
revs = []
for rev in revs:
if not isinstance(rev, basestring):
- die("Invalid type for revision in revs list from config file. "
- "'str' type is required, and a %r was given."
- % type(rev).__name__)
+ die(
+ "Invalid type for revision in revs list from config file. "
+ "'str' type is required, and a %r was given." % type(rev).__name__
+ )
try:
repository.git.rev_parse([rev, "--rev_only", "--"])
except ShellError:
raise
die("Revision %r is not valid." % rev)
- if revs == ["HEAD", ]:
+ if revs == [
+ "HEAD",
+ ]:
return []
return revs
except ShellError as e:
warn(
"Error parsing git config: %s."
- " Couldn't check if 'i18n.logOuputEncoding' was set."
- % (str(e)))
+ " Couldn't check if 'i18n.logOuputEncoding' was set." % (str(e))
+ )
## Final defaults coming from git defaults
return log_encoding or DEFAULT_GIT_LOG_ENCODING
## Config Manager
##
+
class Config(dict):
def __getitem__(self, label):
## Safe print
##
+
def safe_print(content):
if not PY3:
if isinstance(content, unicode):
content = content.encode(_preferred_encoding)
try:
- print(content, end='')
+ print(content, end="")
sys.stdout.flush()
except UnicodeEncodeError:
if DEBUG:
raise
## XXXvlab: should use $COLUMNS in bash and for windows:
## http://stackoverflow.com/questions/14978548
- stderr(paragraph_wrap(textwrap.dedent("""\
+ stderr(
+ paragraph_wrap(
+ textwrap.dedent(
+ """\
UnicodeEncodeError:
There was a problem outputing the resulting changelog to
your console.
This probably means that the changelog contains characters
that can't be translated to characters in your current charset
(%s).
- """) % sys.stdout.encoding))
- if WIN32 and PY_VERSION < 3.6 and sys.stdout.encoding != 'utf-8':
+ """
+ )
+ % sys.stdout.encoding
+ )
+ )
+ if WIN32 and PY_VERSION < 3.6 and sys.stdout.encoding != "utf-8":
## As of PY 3.6, encoding is now ``utf-8`` regardless of
## PYTHONIOENCODING
## https://www.python.org/dev/peps/pep-0528/
- stderr(" You might want to try to fix that by setting "
- "PYTHONIOENCODING to 'utf-8'.")
+ stderr(
+ " You might want to try to fix that by setting "
+ "PYTHONIOENCODING to 'utf-8'."
+ )
exit(1)
except IOError as e:
if e.errno == 0 and not PY3 and WIN32:
## Yes, had a strange IOError Errno 0 after outputing string
## that contained UTF-8 chars on Windows and PY2.7
pass ## Ignoring exception
- elif ((WIN32 and e.errno == 22) or ## Invalid argument
- (not WIN32 and e.errno == errno.EPIPE)): ## Broken Pipe
+ elif (WIN32 and e.errno == 22) or ( ## Invalid argument
+ not WIN32 and e.errno == errno.EPIPE
+ ): ## Broken Pipe
## Nobody is listening anymore to stdout it seems. Let's bailout.
if PY3:
try:
except BrokenPipeError: ## expected outcome on linux
pass
except OSError as e2:
- if e2.errno != 22: ## expected outcome on WIN32
+ if e2.errno != 22: ## expected outcome on WIN32
raise
## Yay ! stdout is closed we can now exit safely.
exit(0)
## Main
##
+
def main():
global DEBUG
## Basic environment infos
reference_config = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- "gitchangelog.rc.reference")
+ os.path.dirname(os.path.realpath(__file__)), "gitchangelog.rc.reference"
+ )
basename = os.path.basename(sys.argv[0])
if basename.endswith(".py"):
debug_varname = "DEBUG_%s" % basename.upper()
DEBUG = os.environ.get(debug_varname, False)
- i = lambda x: x % {'exname': basename}
+ i = lambda x: x % {"exname": basename}
- opts = parse_cmd_line(usage=i(usage_msg),
- description=i(description_msg),
- epilog=i(epilog_msg),
- exname=basename,
- version=__version__)
+ opts = parse_cmd_line(
+ usage=i(usage_msg),
+ description=i(description_msg),
+ epilog=i(epilog_msg),
+ exname=basename,
+ version=__version__,
+ )
DEBUG = DEBUG or opts.debug
try:
except ShellError as e:
stderr(
"Error parsing git config: %s."
- " Won't be able to read 'rc-path' if defined."
- % (str(e)))
+ " Won't be able to read 'rc-path' if defined." % (str(e))
+ )
gc_rc = None
gc_rc = normpath(gc_rc, cwd=repository.toplevel) if gc_rc else None
## config file lookup resolution
for enforce_file_existence, fun in [
- (True, lambda: os.environ.get('GITCHANGELOG_CONFIG_FILENAME')),
+ (True, lambda: os.environ.get("GITCHANGELOG_CONFIG_FILENAME")),
(True, lambda: gc_rc),
- (False,
- lambda: (os.path.join(repository.toplevel, ".%s.rc" % basename))
- if not repository.bare else None)]:
+ (
+ False,
+ lambda: (
+ (os.path.join(repository.toplevel, ".%s.rc" % basename))
+ if not repository.bare
+ else None
+ ),
+ ),
+ ]:
changelogrc = fun()
if changelogrc:
if not os.path.exists(changelogrc):
config = load_config_file(
os.path.expanduser(changelogrc),
default_filename=reference_config,
- fail_if_not_present=False)
+ fail_if_not_present=False,
+ )
config = Config(config)
log_encoding = get_log_encoding(repository, config)
revlist = get_revision(repository, config, opts)
- config['unreleased_version_label'] = eval_if_callable(
- config['unreleased_version_label'])
+ config["unreleased_version_label"] = eval_if_callable(
+ config["unreleased_version_label"]
+ )
manage_obsolete_options(config)
try:
content = changelog(
- repository=repository, revlist=revlist,
- ignore_regexps=config['ignore_regexps'],
- section_regexps=config['section_regexps'],
- unreleased_version_label=config['unreleased_version_label'],
- tag_filter_regexp=config['tag_filter_regexp'],
+ repository=repository,
+ revlist=revlist,
+ ignore_regexps=config["ignore_regexps"],
+ section_regexps=config["section_regexps"],
+ unreleased_version_label=config["unreleased_version_label"],
+ tag_filter_regexp=config["tag_filter_regexp"],
output_engine=config.get("output_engine", rest_py),
include_merge=config.get("include_merge", True),
body_process=config.get("body_process", noop),
except KeyboardInterrupt:
if DEBUG:
- err("Keyboard interrupt received while running '%s':"
- % (basename, ))
+ err("Keyboard interrupt received while running '%s':" % (basename,))
stderr(format_last_exception())
else:
err("Keyboard Interrupt. Bailing out.")
exit(130) ## Actual SIGINT as bash process convention.
except Exception as e: ## pylint: disable=broad-except
if DEBUG:
- err("Exception while running '%s':"
- % (basename, ))
+ err("Exception while running '%s':" % (basename,))
stderr(format_last_exception())
else:
message = "%s" % e
err(message)
- stderr(" (set %s environment variable, "
- "or use ``--debug`` to see full traceback)" %
- (debug_varname, ))
+ stderr(
+ " (set %s environment variable, "
+ "or use ``--debug`` to see full traceback)" % (debug_varname,)
+ )
exit(255)