Source code for ycleptic.src.walkers

# Author: Cameron F. Abrams <cfa22@drexel.edu>

"""
Recursive functions that traverse the attribute tree for setting values
"""
from __future__ import annotations
import logging

logger = logging.getLogger(__name__)

from .dictthings import special_update
from .stringthings import raise_clean

[docs] def make_def(L: list[dict], H: dict, *args): """ Recursively generates YAML-format default user-config hierarchy with default attribute values Parameters ---------- L : list of dict The list of attributes to traverse (the "base config") H : dict The dictionary to populate with default values (the "user config") args : tuple The current attribute names to traverse in the hierarchy. """ if len(args) == 1: name = args[0] try: item_idx = [x["name"] for x in L].index(name) except ValueError: raise_clean(ValueError(f'{name} is not a recognized attribute')) item = L[item_idx] for d in item.get("attributes", []): if "default" in d: H[d["name"]] = d["default"] else: H[d["name"]] = None if not "attributes" in item: if "default" in item: H[item["name"]] = item["default"] else: H[item["name"]] = None elif len(args) > 1: arglist = list(args) nextarg = arglist.pop(0) args = tuple(arglist) try: item_idx = [x["name"] for x in L].index(nextarg) except ValueError: raise ValueError(f'{nextarg} is not a recognized attribute') item = L[item_idx] make_def(item["attributes"], H, *args)
[docs] def mwalk(D1: dict, D2: dict): """ Recursively updates the base config D1 with base config D2. This is used when reading a user dotfile that defines a partial base config in addition to whatever the user app base config defines. Parameters ---------- D1 : dict The base config dictionary to be updated. D2 : dict The base config dictionary that contains the new values to merge into D1. This is typically the user dotfile that defines a partial base config. """ assert 'attributes' in D1 assert 'attributes' in D2 tld1 = [x['name'] for x in D1['attributes']] for d2 in D2['attributes']: if d2['name'] in tld1: logger.debug(f'Config attribute {d2["name"]} is in the dotfile') didx = tld1.index(d2['name']) d1 = D1['attributes'][didx] if 'attributes' in d1 and 'attributes' in d2: mwalk(d1, d2) else: d1.update(d2) else: D1['attributes'].append(d2)
[docs] def dwalk(D: dict, I: dict): """ Recursively process the user's config-dict I by walking recursively through it along with the default config-specification dict D Parameters ---------- D : dict The attribute specification dictionary to walk through. I : dict The user's config dictionary to be processed. """ dname = D.get("name", "root") if not 'attributes' in D: raise ValueError(f'Attribute {dname} has no attributes; cannot walk through it.') # get the name of each config attribute at this level in this block tld = [x['name'] for x in D['attributes']] if I == None: raise ValueError(f'Null dictionary found; expected a dict with key(s) {tld} under \'{dname}\'.') # The user's config file is a dictionary whose keys must match attribute names in the config ud = list(I.keys()) for u in ud: if not u in tld: raise_clean(ValueError(f'Attribute \'{u}\' invalid; expecting one of {tld} under \'{dname}\'.')) # logger.debug(f'dwalk along {tld} for {I}') # for each attribute name for d in tld: # get its index in the list of attribute names tidx = tld.index(d) # get its dictionary; D['attributes'] is a list dx = D['attributes'][tidx] # logger.debug(f' d {d}') # get its type typ = dx['type'] if typ == 'dict' and (d in I and not isinstance(I[d], dict)): raise_clean(ValueError(f'Attribute \'{d}\' of \'{dname}\' must be a dict; found {type(I[d])}.')) # logger.debug(f' - {d} typ {typ} I {I[d]} # logger.debug(f'- {d} typ {typ} I {I}') # if this attribute name does not already have a key in the result if not d in I: # logger.debug(f' -> not found {d}') # if it is a scalar if typ in ['str', 'int', 'float', 'bool', 'tuple']: # if it has a default, set it if 'default' in dx: I[d] = dx['default'] # logger.debug(f' ->-> default {d} {I[d]}') # if it is flagged as required, die since it is not in the read-in elif 'required' in dx: if dx['required']: raise_clean(ValueError(f'Attribute \'{d}\' of \'{dname}\' requires a value.')) # if it is a dict elif typ == 'dict': # if it is explicitly tagged as not required, do nothing if 'required' in dx: if not dx['required']: continue # whether required or not, set it as empty and continue the walk, # which will set defaults for all descendants if 'attributes' in dx: I[d] = {} dwalk(dx, I[d]) else: I[d] = dx.get('default', {}) elif typ == 'list': if 'required' in dx: if not dx['required']: continue I[d] = dx.get('default', []) # this attribute does appear in I else: if typ == 'str': case_sensitive = dx.get('case_sensitive', True) if not case_sensitive: I[d] = I[d].casefold() # logger.debug(f'case_sensitive {case_sensitive}') if 'choices' in dx: if not case_sensitive: # just check the choices that were provided by the user if not I[d].casefold() in [x.casefold() for x in dx['choices']]: raise_clean(ValueError(f'Attribute \'{d}\' of \'{dx["name"]}\' must be one of {", ".join(dx["choices"])} (case-insensitive); found \'{I[d]}\'')) else: # check the choices that were provided by the user if not I[d] in dx['choices']: raise_clean(ValueError(f'Attribute \'{d}\' of \'{dx["name"]}\' must be one of {", ".join(dx["choices"])}; found \'{I[d]}\'')) elif typ == 'dict': # process descendants if 'attributes' in dx: dwalk(dx, I[d]) else: I[d] = special_update(dx.get('default', {}), I[d]) elif typ == 'list': # process list-item children if 'attributes' in dx: lwalk(dx, I[d]) else: defaults = dx.get('default', []) I[d] = defaults + I[d] elif typ == 'tuple': if 'attributes' in dx: raise_clean(TypeError(f'Attribute \'{d}\' of \'{dname}\' cannot have subattributes.')) I[d] = dx.get('default', ())
[docs] def lwalk(D: dict, L: list[dict]): """ Recursively processes a list of items L by walking recursively through it along with the default config-specification dict D Parameters ---------- D : dict The attribute specification dictionary. L : list of dict The list of dictionary items to be processed against D. """ assert 'attributes' in D tld = [x['name'] for x in D['attributes']] # logger.debug(f'lwalk on {tld}') for item in L: # check this item against its attribute itemname = list(item.keys())[0] # logger.debug(f' - item {item}') if not itemname in tld: raise_clean(ValueError(f'Element \'{itemname}\' of list \'{D["name"]}\' is not valid; expected one of {tld}')) tidx = tld.index(itemname) dx = D['attributes'][tidx] typ = dx['type'] if typ in ['str', 'int', 'float']: # because a list attribute indicates an ordered sequence of tasks and we expect each # task to be a dictionary specifying the task and not a single scalar value, # we will ignore this one logger.debug(f'Warning: Scalar list-element-attribute \'{dx}\' in \'{dx["name"]}\' ignored.') elif typ == 'dict': if not item[itemname]: item[itemname] = {} dwalk(dx, item[itemname]) else: logger.debug(f'Warning: List-element-attribute \'{itemname}\' in \'{dx["name"]}\' ignored.')