#!/usr/bin/python3 from typing import Dict, List import argparse import contextlib import datetime import glob import logging import os import re import subprocess import sys import yaml PROG = 'cloud-build' class CB: """class for building cloud images""" def __init__(self, config: str, system_datadir: str) -> None: self.parse_config(config) data_dir = (os.getenv('XDG_DATA_HOME', os.path.expanduser('~/.local/share')) + f'/{PROG}/') self.images_dir = data_dir + 'images/' self.work_dir = data_dir + 'work/' self.out_dir = data_dir + 'out/' self.system_datadir = system_datadir self.date = datetime.date.today().strftime('%Y%m%d') self.service_default_state = 'enabled' self.ensure_dirs() logging.basicConfig( filename=f'{data_dir}{PROG}.log', format='%(levelname)s:%(asctime)s - %(message)s', ) self.log = logging.getLogger(PROG) self.log.setLevel(self.log_level) self.info(f'Start {PROG}') @contextlib.contextmanager def pushd(self, new_dir): previous_dir = os.getcwd() self.debug(f'Pushd to {new_dir}') os.chdir(new_dir) yield self.debug(f'Popd from {new_dir}') os.chdir(previous_dir) def parse_config(self, config: str) -> None: with open(config) as f: cfg = yaml.safe_load(f) self.mkimage_profiles_git = os.path.expanduser( cfg.get('mkimage_profiles_git', '') ) self.log_level = getattr(logging, cfg.get('log_level', 'INFO').upper()) self._repository_url = cfg.get('repository_url', 'file:///space/ALT/{branch}') self.bad_arches = cfg.get('bad_arches', []) self._packages = cfg.get('packages', {}) self._services = cfg.get('services', {}) try: self._remote = os.path.expanduser(cfg['remote']) self.key = cfg['key'] if isinstance(self.key, int): self.key = '{:X}'.format(self.key) self._images = cfg['images'] self._branches = cfg['branches'] for _, branch in self._branches.items(): branch['arches'] = {k: {} if v is None else v for k, v in branch['arches'].items()} except KeyError as e: msg = f'Required parameter {e} does not set in config' print(msg, file=sys.stderr) raise Exception(msg) def info(self, msg: str) -> None: self.log.info(msg) def debug(self, msg: str) -> None: self.log.debug(msg) def error(self, msg: str) -> None: self.log.error(msg) raise Exception(msg) def remote(self, branch: str) -> str: return self._remote.format(branch=branch) def repository_url(self, branch: str, arch: str) -> str: url = self._branches[branch]['arches'][arch].get('repository_url') if url is None: url = self._branches[branch].get('repository_url', self._repository_url) return url.format(branch=branch, arch=arch) def call( self, cmd: List[str], *, stdout_to_file: str = '', fail_on_error: bool = True, ) -> None: def maybe_fail(string: str, rc: int) -> None: if fail_on_error: if rc != 0: msg = 'Command `{}` failed with {} return code'.format( string, rc, ) self.error(msg) # just_print = True just_print = False string = ' '.join(cmd) self.debug(f'Call `{string}`') if just_print: print(string) else: if stdout_to_file: p = subprocess.Popen(cmd, stdout=subprocess.PIPE) rc = p.wait() maybe_fail(string, rc) with open(stdout_to_file, 'w') as f: f.write(p.stdout.read().decode()) else: rc = subprocess.call(cmd) maybe_fail(string, rc) def ensure_dirs(self) -> None: for attr in dir(self): if attr.endswith('_dir'): value = getattr(self, attr) if isinstance(value, str): os.makedirs(value, exist_ok=True) for branch in self.branches: os.makedirs(self.images_dir + branch, exist_ok=True) def generate_apt_files(self) -> None: apt_dir = self.work_dir + 'apt' os.makedirs(apt_dir, exist_ok=True) for branch in self.branches: for arch in self.arches_by_branch(branch): repo = self.repository_url(branch, arch) with open(f'{apt_dir}/apt.conf.{branch}.{arch}', 'w') as f: apt_conf = f''' Dir::Etc::main "/dev/null"; Dir::Etc::parts "/var/empty"; Dir::Etc::SourceList "{apt_dir}/sources.list.{branch}.{arch}"; Dir::Etc::SourceParts "/var/empty"; Dir::Etc::preferences "/dev/null"; Dir::Etc::preferencesparts "/var/empty"; ''' f.write(apt_conf) with open(f'{apt_dir}/sources.list.{branch}.{arch}', 'w') as f: sources_list = f'rpm {repo} {arch} classic\n' if arch not in self.bad_arches: sources_list += f'rpm {repo} noarch classic\n' f.write(sources_list) def escape_branch(self, branch: str) -> str: return re.sub(r'\.', '_', branch) def ensure_mkimage_profiles(self, update: bool = False) -> None: """Checks that mkimage-profiles exists or clones it""" def add_rule(variable: str, value: str) -> str: return f'\n\t@$(call add,{variable},{value})' url = self.mkimage_profiles_git if url == '': url = ( 'git://' + 'git.altlinux.org/' + 'people/mike/packages/mkimage-profiles.git' ) os.chdir(self.work_dir) if os.path.isdir('mkimage-profiles'): if update: with self.pushd('mkimage-profiles'): self.info('Updating mkimage-profiles') self.call(['git', 'pull'], fail_on_error=False) else: self.info('Downloading mkimage-profiles') self.call(['git', 'clone', url, 'mkimage-profiles']) # create file with proper brandings with self.pushd('mkimage-profiles'): with open(f'conf.d/{PROG}.mk', 'w') as f: for image in self.images: target = self.target_by_image(image) for branch in self.branches: ebranch = self.escape_branch(branch) requires = [target] requires.extend(self.requires_by_branch(branch)) requires_s = ' '.join(requires) branding = self.branding_by_branch(branch) if branding: branding = f'\n\t@$(call set,BRANDING,{branding})' rules = [branding] for package in self.packages(image, branch): rules.append(add_rule('BASE_PACKAGES', package)) for service in self.enabled_services(image, branch): rules.append(add_rule('DEFAULT_SERVICES_ENABLE', service)) for service in self.disabled_services(image, branch): rules.append(add_rule('DEFAULT_SERVICES_DISABLE', service)) rules_s = ''.join(rules) s = f'{target}_{ebranch}: {requires_s}; @:{rules_s}' print(s, file=f) self.generate_apt_files() @property def branches(self) -> List[str]: return list(self._branches.keys()) def arches_by_branch(self, branch: str) -> List[str]: return list(self._branches[branch]['arches'].keys()) def branding_by_branch(self, branch: str) -> str: return self._branches[branch].get('branding', '') def requires_by_branch(self, branch: str) -> List[str]: return self._branches[branch].get('requires', []) @property def images(self) -> List[str]: return list(self._images.keys()) def kinds_by_image(self, image: str) -> List[str]: return self._images[image]['kinds'] def target_by_image(self, image: str) -> str: return self._images[image]['target'] def skip_arch(self, image: str, arch: str) -> bool: return arch in self._images[image].get('exclude_arches', []) def get_items( self, data: Dict, image: str, branch: str, state_re: str = None, default_state: str = None, ) -> List[str]: items = [] if state_re is None: state_re = '' if default_state is None: default_state = state_re for item, constraints in data.items(): if ( image in constraints.get('exclude_images', []) or branch in constraints.get('exclude_branches', []) ): continue # Empty means no constraint: e.g. all images images = constraints.get('images', [image]) branches = constraints.get('branch', [branch]) state = constraints.get('state', default_state) if ( image in images and branch in branches and re.match(state_re, state) ): items.append(item) return items def packages(self, image: str, branch: str) -> List[str]: return self.get_items(self._packages, image, branch) def enabled_services(self, image: str, branch: str) -> List[str]: return self.get_items( self._services, image, branch, 'enabled?', self.service_default_state, ) def disabled_services(self, image: str, branch: str) -> List[str]: return self.get_items( self._services, image, branch, 'disabled?', self.service_default_state, ) def build_tarball( self, target: str, branch: str, arch: str, kind: str ) -> str: self.ensure_mkimage_profiles() target = f'{target}_{self.escape_branch(branch)}' image = re.sub(r'.*/', '', target) full_target = f'{target}.{kind}' tarball = f'{self.out_dir}{image}-{self.date}-{arch}.{kind}' apt_dir = self.work_dir + 'apt' with self.pushd(self.work_dir + 'mkimage-profiles'): if os.path.exists(tarball): self.info(f'Skip building of {full_target} {arch}') else: cmd = [ 'make', f'APTCONF={apt_dir}/apt.conf.{branch}.{arch}', f'ARCH={arch}', f'IMAGE_OUTDIR={self.out_dir.rstrip("/")}', full_target, ] self.info(f'Begin building of {full_target} {arch}') self.call(cmd) if os.path.exists(tarball): self.info(f'End building of {full_target} {arch}') else: self.error(f'Fail building of {full_target} {arch}') return tarball def image_path(self, image: str, branch: str, arch: str, kind: str) -> str: path = '{}{}/alt-{}-{}-{}.{}'.format( self.images_dir, branch, branch.lower(), image, arch, kind, ) return path def copy_image(self, src: str, dst: str) -> None: os.link(src, dst) def clear_imager_dir(self): for branch in self.branches: directory = f'{self.images_dir}{branch}' for path in os.listdir(directory): os.unlink(f'{directory}/{path}') def create_images(self) -> None: self.clear_imager_dir() for branch in self.branches: images_in_branch = [] for image in self.images: target = self.target_by_image(image) for arch in self.arches_by_branch(branch): if self.skip_arch(image, arch): continue for kind in self.kinds_by_image(image): tarball = self.build_tarball( target, branch, arch, kind, ) image_path = self.image_path(image, branch, arch, kind) self.copy_image(tarball, image_path) images_in_branch.append(image_path) self.checksum_sign(images_in_branch) def checksum_sign(self, images): if len(images) == 0: self.error('Empty list of images to checksum_sign') sum_file = 'SHA256SUM' with self.pushd(os.path.dirname(images[0])): files = [os.path.basename(x) for x in images] string = ','.join(files) cmd = ['sha256sum'] + files self.info(f'Calculate checksum of {string}') self.call(cmd, stdout_to_file=sum_file) self.info(f'Sign checksum of {string}') self.call(['gpg2', '--yes', '-basu', self.key, sum_file]) def kick(self): remote = self._remote colon = remote.find(':') if colon != -1: host = remote[:colon] self.call(['ssh', host, 'kick']) def sync(self) -> None: self.create_images() for branch in self.branches: remote = self.remote(branch) files = glob.glob(f'{self.images_dir}{branch}/*') cmd = ['rsync', '-v'] + files + [remote] self.call(cmd) self.kick() def get_data_dir() -> str: data_dir = (os.getenv('XDG_DATA_HOME', os.path.expanduser('~/.local/share')) + f'/{PROG}/') return data_dir def parse_args(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( '-c', '--config', default=f'/etc/{PROG}/config.yaml', help='path to config', ) parser.add_argument( '-d', '--data-dir', default=f'/usr/share/{PROG}', help='system data directory', ) args = parser.parse_args() if not args.data_dir.endswith('/'): args.data_dir += '/' return args def main(): args = parse_args() cloud_build = CB(args.config, args.data_dir) cloud_build.sync() if __name__ == '__main__': main()