import os, os.path, re
+from datetime import datetime, timedelta, tzinfo
+
from stgit import exception, run, utils
+from stgit.config import config
class RepositoryException(exception.StgException):
pass
+class DateException(exception.StgException):
+ def __init__(self, string, type):
+ exception.StgException.__init__(
+ self, '"%s" is not a valid %s' % (string, type))
+
class DetachedHeadException(RepositoryException):
def __init__(self):
RepositoryException.__init__(self, 'Not on any branch')
return None
return d
+class TimeZone(tzinfo, Repr):
+ def __init__(self, tzstring):
+ m = re.match(r'^([+-])(\d{2}):?(\d{2})$', tzstring)
+ if not m:
+ raise DateException(tzstring, 'time zone')
+ sign = int(m.group(1) + '1')
+ try:
+ self.__offset = timedelta(hours = sign*int(m.group(2)),
+ minutes = sign*int(m.group(3)))
+ except OverflowError:
+ raise DateException(tzstring, 'time zone')
+ self.__name = tzstring
+ def utcoffset(self, dt):
+ return self.__offset
+ def tzname(self, dt):
+ return self.__name
+ def dst(self, dt):
+ return timedelta(0)
+ def __str__(self):
+ return self.__name
+
+class Date(Repr):
+ """Immutable."""
+ def __init__(self, datestring):
+ # Try git-formatted date.
+ m = re.match(r'^(\d+)\s+([+-]\d\d:?\d\d)$', datestring)
+ if m:
+ try:
+ self.__time = datetime.fromtimestamp(int(m.group(1)),
+ TimeZone(m.group(2)))
+ except ValueError:
+ raise DateException(datestring, 'date')
+ return
+
+ # Try iso-formatted date.
+ m = re.match(r'^(\d{4})-(\d{2})-(\d{2})\s+(\d{2}):(\d{2}):(\d{2})\s+'
+ + r'([+-]\d\d:?\d\d)$', datestring)
+ if m:
+ try:
+ self.__time = datetime(
+ *[int(m.group(i + 1)) for i in xrange(6)],
+ **{'tzinfo': TimeZone(m.group(7))})
+ except ValueError:
+ raise DateException(datestring, 'date')
+ return
+
+ raise DateException(datestring, 'date')
+ def __str__(self):
+ return self.isoformat()
+ def isoformat(self):
+ """Human-friendly ISO 8601 format."""
+ return '%s %s' % (self.__time.replace(tzinfo = None).isoformat(' '),
+ self.__time.tzinfo)
+ @classmethod
+ def maybe(cls, datestring):
+ if datestring in [None, NoValue]:
+ return datestring
+ return cls(datestring)
+
class Person(Repr):
"""Immutable."""
def __init__(self, name = NoValue, email = NoValue,
self.__name = d(name, 'name')
self.__email = d(email, 'email')
self.__date = d(date, 'date')
+ assert isinstance(self.__date, Date) or self.__date in [None, NoValue]
name = property(lambda self: self.__name)
email = property(lambda self: self.__email)
date = property(lambda self: self.__date)
assert m
name = m.group(1).strip()
email = m.group(2)
- date = m.group(3)
+ date = Date(m.group(3))
return cls(name, email, date)
+ @classmethod
+ def user(cls):
+ if not hasattr(cls, '__user'):
+ cls.__user = cls(name = config.get('user.name'),
+ email = config.get('user.email'))
+ return cls.__user
+ @classmethod
+ def author(cls):
+ if not hasattr(cls, '__author'):
+ cls.__author = cls(
+ name = os.environ.get('GIT_AUTHOR_NAME', NoValue),
+ email = os.environ.get('GIT_AUTHOR_EMAIL', NoValue),
+ date = Date.maybe(os.environ.get('GIT_AUTHOR_DATE', NoValue)),
+ defaults = cls.user())
+ return cls.__author
+ @classmethod
+ def committer(cls):
+ if not hasattr(cls, '__committer'):
+ cls.__committer = cls(
+ name = os.environ.get('GIT_COMMITTER_NAME', NoValue),
+ email = os.environ.get('GIT_COMMITTER_EMAIL', NoValue),
+ date = Date.maybe(
+ os.environ.get('GIT_COMMITTER_DATE', NoValue)),
+ defaults = cls.user())
+ return cls.__committer
class Tree(Repr):
"""Immutable."""
) % (tree, parents, self.author, self.committer, self.message)
@classmethod
def parse(cls, repository, s):
- cd = cls()
+ cd = cls(parents = [])
lines = list(s.splitlines(True))
for i in xrange(len(lines)):
line = lines[i].strip()
self.__refs = Refs(self)
self.__trees = ObjectCache(lambda sha1: Tree(sha1))
self.__commits = ObjectCache(lambda sha1: Commit(self, sha1))
+ self.__default_index = None
+ self.__default_worktree = None
+ self.__default_iw = None
env = property(lambda self: { 'GIT_DIR': self.__git_dir })
@classmethod
def default(cls):
).output_one_line())
except run.RunException:
raise RepositoryException('Cannot find git repository')
+ @property
def default_index(self):
- return Index(self, (os.environ.get('GIT_INDEX_FILE', None)
- or os.path.join(self.__git_dir, 'index')))
+ if self.__default_index == None:
+ self.__default_index = Index(
+ self, (os.environ.get('GIT_INDEX_FILE', None)
+ or os.path.join(self.__git_dir, 'index')))
+ return self.__default_index
def temp_index(self):
return Index(self, self.__git_dir)
+ @property
def default_worktree(self):
- path = os.environ.get('GIT_WORK_TREE', None)
- if not path:
- o = run.Run('git', 'rev-parse', '--show-cdup').output_lines()
- o = o or ['.']
- assert len(o) == 1
- path = o[0]
- return Worktree(path)
+ if self.__default_worktree == None:
+ path = os.environ.get('GIT_WORK_TREE', None)
+ if not path:
+ o = run.Run('git', 'rev-parse', '--show-cdup').output_lines()
+ o = o or ['.']
+ assert len(o) == 1
+ path = o[0]
+ self.__default_worktree = Worktree(path)
+ return self.__default_worktree
+ @property
def default_iw(self):
- return IndexAndWorktree(self.default_index(), self.default_worktree())
+ if self.__default_iw == None:
+ self.__default_iw = IndexAndWorktree(self.default_index,
+ self.default_worktree)
+ return self.__default_iw
directory = property(lambda self: self.__git_dir)
refs = property(lambda self: self.__refs)
def cat_object(self, sha1):
for attr, v2 in (('name', 'NAME'), ('email', 'EMAIL'),
('date', 'DATE')):
if getattr(p, attr) != None:
- env['GIT_%s_%s' % (v1, v2)] = getattr(p, attr)
+ env['GIT_%s_%s' % (v1, v2)] = str(getattr(p, attr))
sha1 = self.run(c, env = env).raw_input(commitdata.message
).output_one_line()
return self.get_commit(sha1)
return None
finally:
index.delete()
+ def apply(self, tree, patch_text):
+ """Given a tree and a patch, will either return the new tree that
+ results when the patch is applied, or None if the patch
+ couldn't be applied."""
+ assert isinstance(tree, Tree)
+ if not patch_text:
+ return tree
+ index = self.temp_index()
+ try:
+ index.read_tree(tree)
+ try:
+ index.apply(patch_text)
+ return index.write_tree()
+ except MergeException:
+ return None
+ finally:
+ index.delete()
+ def diff_tree(self, t1, t2, diff_opts):
+ assert isinstance(t1, Tree)
+ assert isinstance(t2, Tree)
+ return self.run(['git', 'diff-tree', '-p'] + list(diff_opts)
+ + [t1.sha1, t2.sha1]).raw_output()
class MergeException(exception.StgException):
pass
+class MergeConflictException(MergeException):
+ pass
+
class Index(RunWithEnv):
def __init__(self, repository, filename):
self.__repository = repository
"""In-index merge, no worktree involved."""
self.run(['git', 'read-tree', '-m', '-i', '--aggressive',
base.sha1, ours.sha1, theirs.sha1]).no_output()
+ def apply(self, patch_text):
+ """In-index patch application, no worktree involved."""
+ try:
+ self.run(['git', 'apply', '--cached']
+ ).raw_input(patch_text).no_output()
+ except run.RunException:
+ raise MergeException('Patch does not apply cleanly')
def delete(self):
if os.path.isfile(self.__filename):
os.remove(self.__filename)
+ def conflicts(self):
+ """The set of conflicting paths."""
+ paths = set()
+ for line in self.run(['git', 'ls-files', '-z', '--unmerged']
+ ).raw_output().split('\0')[:-1]:
+ stat, path = line.split('\t', 1)
+ paths.add(path)
+ return paths
class Worktree(object):
def __init__(self, directory):
assert isinstance(ours, Tree)
assert isinstance(theirs, Tree)
try:
- self.run(['git', 'merge-recursive', base.sha1, '--', ours.sha1,
- theirs.sha1],
- env = { 'GITHEAD_%s' % base.sha1: 'ancestor',
- 'GITHEAD_%s' % ours.sha1: 'current',
- 'GITHEAD_%s' % theirs.sha1: 'patched'}
- ).cwd(self.__worktree.directory).discard_output()
+ r = self.run(['git', 'merge-recursive', base.sha1, '--', ours.sha1,
+ theirs.sha1],
+ env = { 'GITHEAD_%s' % base.sha1: 'ancestor',
+ 'GITHEAD_%s' % ours.sha1: 'current',
+ 'GITHEAD_%s' % theirs.sha1: 'patched'}
+ ).cwd(self.__worktree.directory)
+ r.discard_output()
except run.RunException, e:
- raise MergeException('Index/worktree dirty')
+ if r.exitcode == 1:
+ raise MergeConflictException()
+ else:
+ raise MergeException('Index/worktree dirty')
def changed_files(self):
return self.run(['git', 'diff-files', '--name-only']).output_lines()
def update_index(self, files):