"""Interactive python shell with port_stats What's in the box: - load() -- (re)loads the task and packages information - use(repo) -- switch to using - load(repo) -- load stuff and start using the given repo instead of default - pt(name) -- prints information about the package tasks - spt(name) -- prints information about the package tasks, short version - spi(name) -- prints the short package information - gspi(pattern) -- prints the short package information for all the packages with name matching pattern ('g' for 'grep') - list_spi(pkgs_list) -- same as gspi, but for python lists of strings instead of patterns - ti(num) -- prints information about the task #num - fti(num) -- prints information about the task #num, full version (a dict) - logs(num, [idx]) -- print logs (logs/events.X.Y.log) for task #num - next_tasks() -- display all non-done tasks that would update packages - stats() -- prints packages statistics You can also enjoy autocompletion with . """ from __future__ import print_function import atexit import json import logging import os import re import readline import sys from pydoc import pager from port_stats import colorize from port_stats import lists from port_stats import logs as m_logs from port_stats import reports from port_stats import task_predicates as tp from port_stats import tasks from port_stats import utils LOG = logging.getLogger('port_stats.interactive') def get_task(taskid, task_dict): try: return task_dict[int(taskid)] except Exception: raise ValueError('Task not found: %s' % taskid) def package_tasks(pkg, data): ts = sorted(data[pkg], key=lambda t: int(t['taskid'])) return '\n'.join(tasks.format_task(t) for t in ts) # {{{ to= targets def dump(string): print(string) def append_to(filename): def _write(string): if os.path.exists(filename): LOG.info('File %s already exists, appending', filename) with open(filename, 'a') as f: f.write(string) f.write('\n') return _write # }}} # The Interactive Part CONFIG = None CURRENT = None REPOS = {} TASKS = {} PACKAGE_TASKS = {} BY_NAME = {} BY_COLOR = {} def repo_tasks(task_dict=None): for _, t in sorted(TASKS.iteritems()): if t['repo'] == CURRENT['new']: yield t def use(repo=None): global PACKAGE_TASKS, BY_NAME, BY_COLOR, CURRENT if repo is not None: try: CURRENT = (c for c in CONFIG['colorize'] if c['name'] == repo).next() except StopIteration: raise ValueError('Unknown repo: ' + repo) elif CURRENT is None: CURRENT = CONFIG['colorize'][0] LOG.info("Updating data structures for %s...", CURRENT['name']) sys.ps1 = CURRENT['name'] + '>>> ' PACKAGE_TASKS = tasks.tasks_by_package(repo_tasks()) BY_NAME, BY_COLOR = colorize.colorize(REPOS[CURRENT['base']], REPOS[CURRENT['new']]) LOG.info("DONE") def load(repo=None): global TASKS, REPOS TASKS = dict((t['taskid'], t) for t in tasks.load_tasks(CONFIG['tasks'], cache=TASKS)) REPOS = lists.read_all_srclists(CONFIG['repos']) LOG.info("Got %s tasks, %s repositories", len(TASKS), len(REPOS)) for name in sorted(REPOS): LOG.info(" %s: %s srpms", name, len(REPOS[name])) use(repo) def pt(pkg, to=print): to(package_tasks(pkg, PACKAGE_TASKS)) def spt(pkg, to=print): to(tasks.format_tasks_short(PACKAGE_TASKS.get(pkg))) def spi(pkg, to=print): to(reports.package_one_line(pkg, BY_NAME, PACKAGE_TASKS)) def _spi_by_predicate(pred, colors): colors = colors or colorize.COLORS return '\n'.join( reports.package_one_line(name, BY_NAME, PACKAGE_TASKS) for name in sorted(BY_NAME) if pred(name) and BY_NAME[name][0] in colors) def gspi(pattern, colors=None, to=print): p = re.compile(pattern) to(_spi_by_predicate(p.search, colors)) def list_spi(pkgs, colors=None, to=print): pset = frozenset(pkgs) lines = _spi_by_predicate(pset.__contains__, colors) unknown = pset - frozenset(BY_NAME) if unknown: lines += "\nNot in repos:\n\t" + "\n\t".join(sorted(unknown)) to(lines) def ti(num, to=print): to(reports.format_colored_task(get_task(num, TASKS), BY_NAME)) def display_tasks(infos, include=None, exclude=None, to=pager): tasks = (reports.format_colored_task(t, BY_NAME) for t in infos) if include: p = re.compile(include) tasks = (t for t in tasks if p.search(t)) if exclude: p = re.compile(exclude) tasks = (t for t in tasks if not p.search(t)) tasks = list(tasks) to("%s tasks\n\n%s" % (len(tasks), '\n\n'.join(tasks))) def next_tasks(include=None, exclude=None, to=pager): filters = [tp.not_(tp.has('state', 'DONE')), tp.has('owner', 'recycler'), tp.has_fresh_subtasks(BY_NAME)] display_tasks((t for t in repo_tasks() if all(f(t) for f in filters)), include, exclude, to) def fti(num, to=print): to(utils.format_dict(get_task(num, TASKS), indent=True)) def logs(num, idx=-1, to=pager): log_file = m_logs.task_event_logs(get_task(num, TASKS))[idx] with open(log_file, 'r') as f: log = f.read().decode('utf-8', errors='replace') to(log_file + ':\n\n' + log) def stats(names=None, to=dump): total = len(names) if names else len(REPOS[CURRENT['new']]) to(reports.color_totals(BY_COLOR, names, total, summary=True)) def interactive_setup(): # Bind ‘TAB’ to complete readline.parse_and_bind('tab:complete') # Set history file – ~\.pythonhistory histfile = os.path.join(os.environ['HOME'], '.pythonhistory') # Attempt read of histfile try: readline.read_history_file(histfile) except IOError: pass # Write history file at shell exit atexit.register(readline.write_history_file, histfile) # Configure logging logging.basicConfig( format='%(asctime)s %(levelname)-5s %(name)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', stream=sys.stderr, level=logging.INFO) config = sys.argv[1] LOG.info("Loading configuraition file: %s", config) with open(config, 'r') as f: global CONFIG CONFIG = json.load(f) if __name__ == '__main__': interactive_setup() print(__doc__)