2021-08-09 11:22:11 +04:00

232 lines
7.1 KiB
Python

import errno
import json
import logging
import os
import time
from collections import defaultdict
from port_stats import lists
from port_stats import rpm_ffi
from port_stats import utils
LOG = logging.getLogger(__name__)
_ARCHIVED_DIRS = ('eperm', 'failed', 'failure',
'new', 'postponed', 'swept', 'tested')
def _task_dirs(prefix):
yield prefix
done_archive = os.path.join(prefix, 'archive', 'done')
for filename in os.listdir(done_archive):
if filename.startswith('_'):
yield os.path.join(done_archive, filename)
for subdir in _ARCHIVED_DIRS:
yield os.path.join(prefix, 'archive', subdir)
def task_pathes(prefix):
for task_dir in _task_dirs(prefix):
for name in os.listdir(task_dir):
if name.isdigit():
yield task_dir, int(name)
def load_task(task_path, cached_task=None, now=None):
if cached_task and '/archive/' in task_path:
if cached_task.get('task_path') == task_path:
# nothing really happends in archives
return cached_task
info_path = os.path.join(task_path, 'info.json')
try:
st = os.stat(info_path)
except OSError as ex:
if ex.errno == errno.ENOENT:
LOG.debug("Failed to load %s: %s", info_path, ex)
return cached_task
else:
raise
if st.st_size == 0:
LOG.debug("Ignoring empty file: %s", info_path)
return cached_task
try:
mtime = st.st_mtime
if cached_task and mtime < cached_task.get('load_time', 0):
return cached_task
ctime = os.path.getmtime(os.path.join(task_path, 'task', 'id'))
with open(info_path) as f:
data = json.load(f)
# hack(iv@): sometimes info.json is moved before the state
# is set to DONE. we need to fix this in girar, but for now
# we opt for a simpler workaround:
if '/archive/done/' in task_path:
data['state'] = 'DONE'
except Exception as ex:
LOG.error("Failed to load task info from %s: %s", task_path, ex)
return cached_task
data['task_path'] = task_path
data['load_time'] = now or time.time()
data['task_ctime'] = ctime
data['task_mtime'] = mtime
data['task_time'] = utils.format_timestamp(mtime)
data.setdefault('subtasks', {})
data.setdefault('try', 0)
return data
def load_tasks(prefixes, repo=None, cache=None):
if isinstance(prefixes, basestring):
prefixes = [prefixes]
LOG.info("Loading tasks from %s%s", ' '.join(prefixes),
" for " + repo if repo else "")
task_dirs = [path
for prefix in prefixes
for path in task_pathes(prefix)]
LOG.info("Found %s task dirs", len(task_dirs))
tasks = []
now = time.time()
started_at = now
for task_dir, task_id in task_dirs:
task_path = os.path.join(task_dir, str(task_id))
cached = cache.get(task_id) if cache else None
try:
info = load_task(task_path, cached, now)
except Exception:
LOG.error("Failed to load task from %s", task_path, exc_info=True)
else:
if info:
tasks.append(info)
if time.time() - started_at > 15:
LOG.info("Still loading tasks, now at %s of %s (%s)",
len(tasks) + 1, len(task_dirs), task_path)
started_at = time.time()
if repo:
tasks = (t for t in tasks if t.get('repo') == repo)
return sorted(tasks, key=lambda t: t.get('taskid'))
_FORMAT_SUBTASK = {
'srpm': lambda st: 'srpm ' + st.get('srpm'),
'delete': lambda st: 'delete ' + st.get('package'),
'repo': lambda st: '%s %s' % (st.get('dir'),
st.get('tag') or st.get('tag_name'))
}
def format_subtask(subtask, extra_info=None):
stype = subtask.get('type')
result = _FORMAT_SUBTASK.get(stype, utils.format_dict)(subtask)
if extra_info:
pkg = subtask_package(subtask)
if pkg:
result = '%s %s' % (result, extra_info(pkg))
return result
_TASK_FORMAT = '%(taskid)-6d [%(task_time)s] %(state)s try=%(try)s %(owner)s%(test_only)s%(deps)s\n%(subtasks)s' # noqa
def format_task(info, extra_info=None):
fmt_args = info.copy()
fmt_args['test_only'] = ' test-only' if info.get('test_only') else ''
depends = sorted(utils.maybe_int(x) for x in info.get('depends', []))
if depends:
fmt_args['deps'] = ' depends=' + ','.join(str(x) for x in depends)
else:
fmt_args['deps'] = ''
subtasks = sorted((int(k), format_subtask(s, extra_info))
for k, s in info['subtasks'].iteritems())
fmt_args['subtasks'] = '\n'.join('%12d %s' % item for item in subtasks)
return _TASK_FORMAT % fmt_args
def format_tasks_short(tasks, separator=','):
tasks = sorted(tasks or (), key=lambda t: -t.get('taskid', 0))
return separator.join('%(taskid)s=%(state)s' % t for t in tasks)
def task_packages(info):
for subtask in info['subtasks'].values():
pkg = subtask_package(subtask)
if pkg:
yield pkg
def subtask_package(subtask):
if 'pkgname' in subtask:
return subtask['pkgname']
elif 'package' in subtask:
return subtask['package']
elif 'srpm' in subtask:
return subtask['srpm'].rsplit('-', 2)[0]
elif 'dir' in subtask:
name = os.path.basename(subtask['dir'])
if name.endswith('.git'):
name = name[:-4]
return name
elif list(subtask.keys()) == ['userid']:
return None
else:
LOG.error('Failed to parse subtask %s',
utils.format_dict(subtask))
return None
def subtask_is_updating(subtask, cur_nevr, strict=True):
srpm = subtask.get('srpm')
if not srpm or not srpm.endswith('.src.rpm'):
return False
name, version, release = srpm[:-8].rsplit('-', 2)
if name != cur_nevr.name:
return False
srpm_nevr = lists.NEVR(name, cur_nevr.epoch, version, release)
c = rpm_ffi.evr_cmp(cur_nevr.evr, srpm_nevr.evr, 'pkg')
LOG.debug("cur=%s srpm=%s -> %s", cur_nevr, srpm, c)
return c < 0 if strict else c <= 0
def task_is_updating(info, cur_nevr, update_time):
# if task is newer than package, it might be rebuild
strict = info['task_ctime'] < update_time
return any(subtask_is_updating(subtask, cur_nevr, strict)
for subtask in info['subtasks'].values())
def last_update_times(task_list):
result = {}
for info in task_list:
if info['state'] != 'DONE':
continue
mtime = info['task_mtime']
for name in task_packages(info):
if result.get(name, 0) < mtime:
result[name] = mtime
return result
def tasks_by_package(task_list):
result = defaultdict(list)
for task in task_list:
try:
for p in task_packages(task):
result[p].append(task)
except Exception:
LOG.error('Failed to parse task: %s',
utils.format_dict(task), exc_info=True)
return dict((p, sorted(l, key=lambda t: utils.maybe_int(t['taskid'])))
for p, l in result.iteritems())