--- /dev/null
+# -*- coding: utf-8 -*- ex:set ts=4 sw=4 et:
+
+# Copyright © 2008 - Steve Frécinaux
+# License: LGPL 2
+
+from repository import Repository, InvalidRepositoryError
+from objects import Commit, Tree, Blob
--- /dev/null
+# -*- coding: utf-8 -*- ex:set ts=4 sw=4 et:
+
+# Copyright © 2008 - Steve Frécinaux
+# License: LGPL 2
+
+class Config(dict):
+ def __init__(self, repo):
+ dict.__init__(self)
+ self._repo = repo
+ self._load()
+
+ def _load(self):
+ self._data = {}
+ for line in self._repo.run('config', '--list'):
+ key, value = line.strip().split('=', 1)
+ dict.__setitem__(self, key, value.decode('utf-8'))
+
+ def __setitem__(self, key, value):
+ dict.__setitem__(self, key, value)
+ # update the repo config
+ self._repo.run.run_noio(['config', 'key', str(value)])
+
+if __name__ == '__main__':
+ conf = Config()
--- /dev/null
+# -*- coding: utf-8 -*- ex:set ts=4 sw=4 et:
+
+# Copyright © 2008 - Steve Frécinaux
+# License: LGPL 2
+
+import subprocess
+import select
+import exceptions
+import os
+
+class ExecutionError(exceptions.Exception):
+ pass
+
+class GitBinary(object):
+ binary = ['/usr/bin/env', 'git']
+
+ def __init__(self, repo_dir, bare=False):
+ self.repo_args = []
+ if bare:
+ self.repo_args.append('--bare')
+ else:
+ self.repo_args.append('--work-tree=%s' % repo_dir)
+ repo_dir = os.path.join(repo_dir, '.git')
+ self.repo_args.append('--git-dir=%s' % repo_dir)
+ self.bare = bare
+ self.git_dir = repo_dir
+
+ def _command(self, *args):
+ if args[0] == 'clone':
+ return self.binary + list(args)
+ else:
+ return self.binary + self.repo_args + list(args)
+
+ def gen(self, p):
+ while True:
+ rlist = select.select([p.stdout], [], [])[0]
+ if p.stdout in rlist:
+ line = p.stdout.readline()
+ if line:
+ yield line.rstrip("\n")
+ else:
+ break
+ p.stdout.close()
+
+ if p.wait() != 0:
+ raise ExecutionError("Subprocess exited with non-zero returncode"
+ " of %d" % p.returncode)
+
+ def __call__(self, *args, **kwargs):
+ cmd = self._command(args)
+
+ # The input parameter allows to feed the process's stdin
+ input = kwargs.get('input', None)
+ has_input = input is not None
+
+ # The wait parameter will make the function wait for the process to
+ # have completed and return the full output at once.
+ wait = bool(kwargs.get('wait', False))
+
+ # The output parameter will make the function watch for some output.
+ has_output = bool(kwargs.get('output', True))
+
+ p = subprocess.Popen(self._command(*args),
+ stdin = has_input and subprocess.PIPE or None,
+ stdout = has_output and subprocess.PIPE or None,
+ bufsize=1)
+ if has_input:
+ p.stdin.write(input)
+ p.stdin.close()
+
+ if has_output:
+ gen = self.gen(p)
+ return wait and '\n'.join(gen) or gen
+
+ if p.wait() != 0:
+ raise ExecutionError("Subprocess exited with non-zero returncode"
+ " of %d" % p.returncode)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright © 2008 - Steve Frécinaux
+# License: LGPL 2
+
+__all__ = ['issha1']
+
+import re
+
+SHA1_PATTERN = re.compile('^[a-f0-9]{40}$')
+
+def issha1(s):
+ return SHA1_PATTERN.match(s) is not None
--- /dev/null
+# -*- coding: utf-8 -*- ex:set ts=4 sw=4 et:
+
+# Copyright © 2008 - Steve Frécinaux
+# License: LGPL 2
+
+class Object(object):
+ "An object, following Git's definition."
+
+ def __init__(self, repo, objectname):
+ self._repo = repo
+ self._name = objectname
+
+ def __str__(self):
+ return self._name or ''
+
+ def __repr__(self):
+ return '<git.%s "%s">' % (self.__class__.__name__, self._name)
+
+ def __eq__(self, other):
+ # Objects with no name are never equal to any other object.
+ return self._name is not None and self._name == other._name
+
+ def _dirty(self):
+ """
+ Mark an object as dirty. As a result, all its parent objects are
+ marked as dirty too.
+ """
+ # We are already dirty, so our parents should be too.
+ if self._name is None:
+ return
+
+ self._name = None
+ if hasattr(self, 'parent') and self.parent is not None:
+ self.parent._dirty()
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def shortname(self):
+ if self._name is None:
+ return None
+ return self.name[:8]
+
+class Commit(Object):
+ def __init__(self, repo, objectname=None, refname=None):
+ Object.__init__(self, repo, objectname)
+ if objectname is None:
+ self._loaded = True
+ self._tree = Tree(repo, parent=self)
+ self._message = ''
+ else:
+ self._loaded = False
+ self._tree = None
+ self._parents = []
+ self._refname = refname
+
+ def _load(self):
+ if self._loaded: return
+ self._message = '';
+ if self._name is None:
+ return
+ is_header = True
+ for line in self._repo.run('cat-file', 'commit', self._name):
+ if is_header:
+ line = line.strip()
+ if line == '':
+ is_header = False
+ continue
+ key, value = line.split(' ', 1)
+ if key == 'tree':
+ self._tree = Tree(self._repo, value, parent=self)
+ continue
+ if key == 'parent':
+ self._parents.append(value)
+ continue
+ if key == 'author':
+ author, timestamp, offset = value.rsplit(' ', 2)
+ self._author = author
+ self._author_timestamp = timestamp
+ self._author_offset = offset
+ continue
+ if key == 'committer':
+ author, timestamp, offset = value.rsplit(' ', 2)
+ self._committer = author
+ self._committer_timestamp = timestamp
+ self._committer_offset = offset
+ continue
+ continue
+ self._message += line
+
+ def _dirty(self):
+ old_name = self._name
+ if old_name is not None:
+ Object._dirty(self)
+ self._parents = [old_name]
+
+ @property
+ def parents(self):
+ self._load()
+ return [Commit(self._repo, c) for c in self._parents]
+
+ @property
+ def tree(self):
+ self._load()
+ return self._tree
+
+ @property
+ def author(self):
+ self._load()
+ return self._author
+
+ @property
+ def comitter(self):
+ self._load()
+ return self._committer
+
+ @property
+ def message(self):
+ self._load()
+ return self._message
+
+ @property
+ def refname(self):
+ return self._refname
+
+ def write(self):
+ if self._name is not None:
+ return self._name
+
+ tree_name = self.tree.write()
+
+ cmd = ['commit-tree', tree_name]
+ for p in self._parents:
+ cmd.extend(['-p', p])
+
+ self._name = self._repo.run(input=self.message, wait=True, *cmd).strip()
+ return self._name
+
+ def commit(self):
+ self.write()
+ if self.refname is not None:
+ self._repo.run('update-ref', self.refname, self._name, output=False)
+
+class Tree(Object):
+ def __init__(self, repo, objectname=None, mode='040000', parent=None):
+ Object.__init__(self, repo, objectname)
+ self._parent = parent
+ self.mode = mode
+ if objectname is None:
+ self._loaded = True
+ self._contents = {}
+ else:
+ self._loaded = False
+
+ def _load(self):
+ if self._loaded:
+ return
+ self._contents = {}
+ if self._name is None:
+ return
+ for line in self._repo.run('cat-file', '-p', self._name):
+ mode, objtype, objname, filename = line.split(None, 3)
+ if objtype == 'tree':
+ self._contents[filename] = Tree(self._repo, objname, mode=mode, parent=self)
+ elif objtype == 'blob':
+ self._contents[filename] = Blob(self._repo, objname, mode=mode, parent=self)
+ else:
+ raise Exception("Unknown object type: '%s'" % objtype)
+
+ def __getitem__(self, filename):
+ self._load()
+ return self._contents[filename]
+
+ def __setitem__(self, filename, obj):
+ if not isinstance(filename, str):
+ raise ValueError("filename must be a string.")
+ if '/' in filename:
+ raise ValueError("filename cannot contain the '/' symbol.")
+ if not isinstance(obj, Blob) and not isinstance(obj, Tree):
+ raise ValueError("value must be a Blob or Tree object.")
+
+ self._load()
+ self._contents[filename] = obj
+ obj._parent = self
+ self._dirty()
+
+ def __iter__(self):
+ self._load()
+ return iter(self._contents)
+
+ def keys(self):
+ self._load()
+ return self._contents.keys()
+
+ @property
+ def parent(self):
+ "parent of this object"
+ return self._parent
+
+ @property
+ def root(self):
+ "root tree of this object"
+ if isinstance(self._parent, Commit):
+ return self
+ else:
+ return self._parent.root
+
+ def write(self):
+ if self._name is not None:
+ return self._name
+
+ data = []
+ for path in self._contents:
+ obj = self._contents[path]
+ obj.write()
+ objtype = isinstance(obj, Tree) and 'tree' or 'blob'
+ data.append("%s %s %s\t%s" % (obj.mode, objtype, obj.name, path))
+
+ self._name = self._repo.run('mktree', '-z', input='\0'.join(data), wait=True).strip()
+ return self._name
+
+class Blob(Object):
+ def __init__(self, repo, objectname=None, mode='100644', parent=None):
+ Object.__init__(self, repo, objectname)
+ self._parent = parent
+ if objectname is None:
+ self._contents = ''
+ self._loaded = True
+ else:
+ self._loaded = False
+ self.mode = mode
+
+ def _load(self):
+ if self._loaded: return
+ self._contents = self._repo.run('cat-file', 'blob', self._name, wait=True)
+
+ # Contents property
+ def _get_contents(self):
+ self._load()
+ return self._contents
+
+ def _set_contents(self, contents):
+ self._loaded = True # No need to actually load the data here.
+ self._contents = contents
+ self._dirty()
+
+ contents = property(_get_contents, _set_contents)
+ del _get_contents
+ del _set_contents
+
+ @property
+ def parent(self):
+ "parent of this object"
+ return self._parent
+
+ @property
+ def root(self):
+ "root tree of this object"
+ return self._parent.root
+
+ def write(self):
+ if self._name is None:
+ self._name = self._repo.run('hash-object', '-w', '--stdin', input=self.contents, wait=True).strip()
+ return self._name
--- /dev/null
+# -*- coding: utf-8 -*- ex:set ts=4 sw=4 et:
+
+# Copyright © 2008 - Steve Frécinaux
+# License: LGPL 2
+
+import exceptions
+import os
+
+from config import Config
+from gitbinary import GitBinary
+from objects import Commit
+from misc import issha1
+
+class InvalidRepositoryError(exceptions.Exception):
+ pass
+
+class Repository(object):
+ "A Git repository."
+
+ def __init__(self, path, create=False):
+ abspath = os.path.abspath(path)
+ if not os.path.isdir(abspath):
+ raise exceptions.IOError("No such directory: '%s'" % abspath)
+
+ # Find the right path for the repository.
+ if abspath.endswith('.git'):
+ self._path = abspath
+ self._bare = True
+ else:
+ self._path = os.path.join(abspath, '.git')
+ self._bare = False
+
+ # Internal git binary.
+ self.run = GitBinary(abspath, bare=self._bare)
+
+ if create:
+ self.run('init', '--quiet', output=False);
+
+ # Check if we are in a valid repository (heuristics)
+ # FIXME: what if .git is a plain file?
+ if not os.path.isdir(self._path) or \
+ not os.path.isdir(os.path.join(self._path, 'objects')) or \
+ not os.path.isdir(os.path.join(self._path, 'refs')):
+ raise InvalidRepositoryError(abspath)
+
+ def __repr__(self):
+ return '<git.Repository "%s">' % self._path
+
+ # Description property
+ def _get_description(self):
+ filename = os.path.join(self._path, 'description')
+ return file(filename).read().strip()
+
+ def _set_description(self, descr):
+ filename = os.path.join(self._path, 'description')
+ file(filename, 'w').write(descr+"\n")
+
+ description = property(_get_description, _set_description,
+ doc="repository description")
+ del _get_description
+ del _set_description
+
+ # Daemon export property
+ def _get_daemon_export(self):
+ return os.path.isfile(os.path.join(self._path, 'git-daemon-export-ok'))
+
+ def _set_daemon_export(self, value):
+ filename = os.path.join(self._path, 'git-daemon-export-ok')
+ fileexists = os.path.exists(filename)
+ if value and not fileexists:
+ file(filename, 'a').close()
+ elif not value and fileexists:
+ os.unlink(filename)
+
+ daemon_export = property(_get_daemon_export, _set_daemon_export,
+ doc="git-daemon export of this repository.")
+ del _get_daemon_export
+ del _set_daemon_export
+
+ # Config property
+ @property
+ def config(self):
+ "repository configuration"
+ if not hasattr(self, '_config'):
+ self._config = Config(self)
+ return self._config
+
+ # Head property
+ @property
+ def head(self):
+ "repository head"
+ filename = os.path.join(self._path, 'HEAD')
+ symref = file(filename).read().strip()
+ if symref.startswith('ref: '):
+ # The HEAD is a branch tip.
+ ref = symref[5:]
+ commitname = self.run('rev-parse', ref, wait=True).strip()
+ return Commit(self, commitname, refname=ref)
+ else:
+ # We are not in a branch!
+ return Commit(self, symref)
+
+ @property
+ def heads(self):
+ "list all the repository heads"
+ format = "%(refname) %(objectname) %(objecttype)"
+ heads = {}
+ for line in self.run('for-each-ref', '--format=%s' % format, 'refs/heads'):
+ refname, objectname, objecttype = line.strip().split()
+ assert objecttype == 'commit'
+ heads[refname] = Commit(self, objectname, refname=refname)
+ return heads
+
+ def object(self, name):
+ if not issha1(name):
+ name = self.run('rev-parse', name, wait=True).strip()
+ objtype = self.run('cat-file', '-t', name, wait=True).strip()
+ if objtype == 'commit':
+ return Commit(self, name)
+ elif objtype == 'tree':
+ return Tree(self, name)
+ elif objtype == 'blob':
+ return Blob(self, name)
+ else:
+ raise Exception("Unhandled object type: '%s'" % objtype)
+
+ def rev_list(self, since='HEAD', to=None):
+ cmd = ['rev-list', '%s' % since]
+ if to is not None:
+ cmd.append('^%s' % to)
+
+ for line in self.run(*cmd):
+ yield Commit(self, line)
+
+ def clone(self, path):
+ "clone the repository into the provided path"
+ abspath = os.path.abspath(path)
+
+ cmd = ['clone', '--quiet']
+ if path.endswith('.git'):
+ cmd.append('--bare')
+ cmd.append(self._path)
+ cmd.append(abspath)
+ self.run(output=False, *cmd)
+ return Repository(abspath)