Add decorators to test.call

This commit is contained in:
Mikhail Gordeev 2020-04-21 22:23:56 +03:00
parent 5ae1d19a2c
commit 690e24ba44

View File

@ -1,16 +1,137 @@
from pathlib import Path from pathlib import Path
from collections.abc import Iterable, Callable
import datetime import datetime
import os import os
import random import random
import re import re
import subprocess import subprocess
import sys
import time
SUBPROCESS_CALL = subprocess.call SUBPROCESS_CALL = subprocess.call
DEFAULT = object() DEFAULT = object()
def one_arg(args, kwargs):
return not kwargs and len(args) == 1 and isinstance(args[0], Callable)
# apply decorator_factory if cond is True on args
def _make_conditional_d(decorator_factory):
def df(*df_args, cond=None, inverse=False, **df_kwargs):
if cond is None:
cond = lambda x: True # noqa E731
def decorator(prog):
def f(args):
if cond(args) != inverse:
return decorator_factory(*df_args, **df_kwargs)(prog)(args)
else:
return prog(args)
return f
return decorator
return df
def _decorator_add_factory_d(decorator):
def df(*df_args, **df_kwargs):
if one_arg(df_args, df_kwargs):
return decorator(df_args[0])
else:
def d(f):
return decorator(f)
return d
return df
def _factory_add_decorator_df(*dff_args, **dff_kwargs):
def dff(decorator_factory):
def df(*df_args, **df_kwargs):
if one_arg(df_args, df_kwargs):
return decorator_factory(*dff_args, **dff_kwargs)(df_args[0])
else:
def df1(d):
if df_args:
args = df_args
else:
args = dff_args
kwargs = dff_kwargs.copy()
kwargs.update(df_kwargs)
return decorator_factory(*args, **kwargs)(d)
return df1
return df
return dff
@_factory_add_decorator_df()
@_make_conditional_d
@_decorator_add_factory_d
def print_d(func):
def f(*args, **kwargs):
sargs = list(args)
sargs.extend(f'{k}={v}' for k, v in kwargs.items())
print(f'{func.__name__}({sargs})', file=sys.stdout)
return func(*args)
return f
@_factory_add_decorator_df()
@_make_conditional_d
@_decorator_add_factory_d
def nop_d(func):
def f(*args, **kwargs):
pass
return f
@_make_conditional_d
def before_d(func_before, *fb_args, **fb_kwargs):
def d(func):
def f(*args, **kwargs):
func_before(*fb_args, **fb_kwargs)
return func(*args, **kwargs)
return f
return d
@_make_conditional_d
def after_d(func_after, *fa_args, save_rc=True, **fa_kwargs):
def d(func):
def f(*args, **kwargs):
old_rc = func(*args, **kwargs)
new_rc = func_after(*fa_args, **fa_kwargs)
if save_rc:
return old_rc
else:
return new_rc
return f
return d
@_factory_add_decorator_df(1)
@_make_conditional_d
def sleep_d(s):
def d(func):
def f(*args, **kwargs):
time.sleep(s)
return func(*args, **kwargs)
return f
return d
@_factory_add_decorator_df(1)
@_make_conditional_d
def return_d(rc):
def d(func):
def f(*args, **kwargs):
func(*args, **kwargs)
return rc
return f
return d
def error_call(args): def error_call(args):
raise Exception(f'Not implemened call for `{args}`') raise Exception(f'Not implemened call for `{args}`')
@ -54,8 +175,14 @@ def rsync(args):
return SUBPROCESS_CALL(args, stdout=subprocess.DEVNULL) return SUBPROCESS_CALL(args, stdout=subprocess.DEVNULL)
def decorate(decorators, func):
for decorator in reversed(decorators):
func = decorator(func)
return func
class Call(): class Call():
def __init__(self, progs=None): def __init__(self, progs=None, decorators=None):
self.progs = { self.progs = {
'git': git, 'git': git,
'make': make, 'make': make,
@ -67,11 +194,16 @@ class Call():
if progs is not None: if progs is not None:
self.progs.update(progs) self.progs.update(progs)
if decorators is None:
decorators = {}
self.decorators = decorators
def __call__(self, args): def __call__(self, args):
rc = None rc = None
prog = args[0] prog = args[0]
if prog in self.progs: func = self.progs.get(prog, DEFAULT)
rc = self.progs[prog](args) decorators = self.decorators.get(prog, [])
else: if not isinstance(decorators, Iterable):
rc = self.progs[DEFAULT](args) decorators = [decorators]
rc = decorate(decorators, func)(args)
return rc return rc