Source code for yapyutils.config.capabilities

# -*- coding: utf-8 -*-
"""*yapyutils.config.capabilities* provides simple hierarchical configuration
file utils based on in-memory JSON representation. This includes nested and
chained configuration data branches.

The provided features are designed to provide some powerful features of structured
configuration based on basic standard libraries, thus providing advanced setup
support utilities for components of the lower software stack.

The higher layer software components should prefer more versatile and powerful
libraries such as **multiconf**.

The package *multiconf* provides tools for the conversion of various data formats
into *JSON*. This could be used to apply e.g. data type definitions supporting
inline comments - which JSON lacks.

"""

import os

from yapyutils.config import YapyUtilsConfigError
import yapydata.datatree.synjson



__author__ = 'Arno-Can Uestuensoez'
__license__ = "Artistic-License-2.0 + Forced-Fairplay-Constraints"
__copyright__ = "Copyright (C) 2019 Arno-Can Uestuensoez" \
                " @Ingenieurbuero Arno-Can Uestuensoez"
__version__ = '0.1.1'
__uuid__ = "60cac28d-efe6-4a8d-802f-fa4fc94fa741"

__docformat__ = "restructuredtext en"


#: default alternate search path locations for configuration files
_SPATH = (
    os.curdir,
    os.path.dirname(__file__) + os.sep + 'builder' + os.sep,
)


#: known suffixes
SUFFIXES = (
    'yaml',        # standard yaml 
    'json',        # standard json
    'xml',         # xml.etree
    'inix',        # configparser + custom
    'ini',         # configparser + custom
    'cfg',         # configparser + custom
    'conf',        # configparser + custom
    'properties',  # configparser + custom
)

[docs]class YapyUtilsCapabilityError(YapyUtilsConfigError): """Common access error. """ pass
[docs]class YapyUtilsCapabilityOidError(YapyUtilsCapabilityError): """Requested object name is not present. """ pass
_debug = 0 _verbose = 0 # helper for multi-break class _FileFound(Exception): pass
[docs]class Capability(object): """Provides JSON based read-only configuration of capabilities. This in particular comprises the priority based readout of values and defaults. The structure hereby includes specialization by subcomponents, where the missing value will be tried from the more general enclosing super component. The access to structured data trees offers various method to access paths of nested node attributes. This comprises the creation as well as the readout. The following equivalent creation methods are supported, where 'treenode' could be either the root node, or any subordinated branch:: treenode['subnode0']['subnode1']['subnode7'] = value # dynamic items value = treenode( 'subnode0', 'subnode1', 'subnode7', create=True, ) # dynamic items by '__call__' value = treenode.subnode0.subnode1.subnode7 # static attribute addressing style The following equivalent readout methods are supported, where 'treenode' could be either the root node, or any subordinated branch:: value = treenode['subnode0']['subnode1']['subnode7'] # dynamic items value = treenode('subnode0', 'subnode1', 'subnode7') # dynamic items by '__call__' value = treenode.subnode0.subnode1.subnode7 # static attribute addressing style """ M_FIRST = 1 # use first matching node M_LAST = 2 # use last matching node M_ALL = 3 # use all - iterate all matches match_map = { M_FIRST: 1, M_LAST: 2, M_ALL: 3, 'first': 1, 'last': 2, 'all': 3, }
[docs] def __init__(self, data={}): """ Args: data: Configuration data:: data := ( <dict> # in-memory JSON structure <file-path-name> # persistent JSON data ) Returns: None / initialized object Raises: YapyUtilsCapabilityError pass-through """ self.data = data if not isinstance(self.data, (dict,)): raise YapyUtilsCapabilityError( "top 'node' must be a 'dict' == JSON object, got: " + str(self.data) )
[docs] def __getitem__(self, key): """Gets the value of the path within the data. Args: key: The value of the node within *data*:: key := ( <single-key> | <list-of-keys> | <tuple-of-keys> ) Returns: The value of the addressed node/value. Raises: pass-through """ if type(key) in (list, tuple): res = self.data for k in key: res = res[k] return res return self.data[key]
[docs] def __setitem__(self, key, val): """Sets the value of the path within the data. Args: key: The value of the node within *data*:: key := ( <single-key> | <list-of-keys> | <tuple-of-keys> ) val: The node/value to be set at the addressed path. Non present values are created, present are replaced. REMARK: In case of lists the value of the key for a non-present value has to be the increment of the highest present key. Sparse lists are not supported. Returns: The value of the addressed node/value. Raises: pass-through """ if isinstance(key, (list, tuple)): res = self.data last = 0 for k in key: if isinstance(res, (list, tuple)): # treat tuple equal until it raises if k < len(res): res = res[k] elif k == len(res): if last == 1: res[-1] = type() res.append(None) last = 1 res[key[-1]] = val return self.data[key] = val
[docs] def __setattr__(self, name, value): """Validates types of own data attributes. Args: name: Name of the attribute. Following are reserved and treated special: * type: str - 'data' The value is treated as the replacement of the internal data attribute. Replaces or creates the complete data of teh current instance. value: The value of the attribute. This by default superposes present values by replacement. Non-present are created. Returns: Raises: YapyUtilsCapabilityError """ if name == 'data': # # replacement of current managed data # if not isinstance(value, dict): raise YapyUtilsCapabilityError( "value must be a 'dict' == JSON object, got: " + str(type(value)) ) self.__dict__[name] = value else: # # any standard attribute with standard behavior # return object.__setattr__(self, name, value)
[docs] def __getattr__(self, name): """ Args: Returns: Raises: """ if name == 'data': return self.__dict__[name] elif isinstance(name, tuple): return self.__getattr__(name) else: # return self.__dict__[name] # return object.__getattr__(name) return super(Capability, self).__getattr__(name)
[docs] def __call__(self, *subpath, **kargs): """Readout the value of a node, or an attribute. The name binding of the path is provided as a tuple of path items. Args: subpath: The list of keys constituting a branch of a data tree. The *subpath* is treated as a branch of one of the nodes of a provided *searchpath* - which is by default the top node. The supported values are:: subpath := <list-of-node-ids> <list-of-node-ids> := <node-id> [',' <list-of-node-ids>] node-id := ( str # strings as keys - objects/dict only | int # integers as index - lists only | tuple | list # set as choice of valid literal path items | <regexpr> ) regexpr := <compiled-python-regular-expression> compiled-python-regular-expression := re.compile(<regexpr>) regexpr := "regular expression" kargs: searchpath: Optional search path for the match of the provided address *subpath*. The provided *subpath* is applied to each node of the *searchpath* in accordance to the *direction* option. This provides the search and enumeration of side branches:: searchpath := <path-item-list> path-item-list := <path-item> [, <path-item-list>] path-item := ( str # item name | int # item index ) default := <top-node> The search path entries has to be actually present by default. These could be either created by loading a complete tree structure, or by using the *Capabilities.create()* member. See also parameter 'strict'. direction: The search direction of the *subpath* within the *searchpath*. In case of multiple superpositioned attributes the first traversed match. The provided values are:: direction := ( up | 0 | False # search from right-to-left | down | 1 | True # search from left-to-right ) default:= up match: Sets the match criteria for the search operation. Interferes with *direction*:: match := ( M_FIRST | 'first' # use first matching node | M_LAST | 'last' # use last matching node | M_ALL | 'all' # use all - iterate all matches ) default := M_FIRST partial: Enables the return of partial sub paths in case the requested path is not completely present. :: partial := ( True # when not completely present, the longest # existing part is returned, the completeness # is provided by the result attribute <partial> | False # when not completely present an exception # is raised ) strict: Controls the required consistency. This comprises: 1. the presence of the search path entries 2. the presence of the requested subpath within the set of search paths Returns: In case of a match returns the tuple:: return := (<attr-value-path>, <attr-value>, <partial>) attr-value-path := ( "the list of keys of the top-down path" | "empty list when no item exists" # see <complete> ) attr-value := "value of the targeted node/attribute" partial := ( False # the complete requested path | True # the actually present part of the path ) Else raises *YapyUtilsCapabilityOidError*. Raises: YapyUtilsCapabilityOidError pass-through """ _srch = kargs.get('searchpath', ()) _dir = kargs.get('direction', 0) _match = kargs.get('match', Capability.M_FIRST) if not isinstance(_srch, (tuple, list,)): raise YapyUtilsCapabilityError( "search path requires 'tuple' or'list', got: " + str(_srch) ) # # match criteria # try: _match = self.match_map[_match] except IndexError: try: _match = self.match_map[str(_match).lower()] except KeyError: raise YapyUtilsCapabilityError( "valid match are (first, %d, last, %d, all, %d,), got: %s" %( Capability.M_FIRST, Capability.M_LAST, Capability.M_ALL, str(_match) ) ) # # search direction # if _dir in (True, False,): pass else: _dir = str(_dir).lower() if _dir in ('up', '0',): _dir = False elif _dir in ('down', '1',): _dir = True else: raise YapyUtilsCapabilityError( "valid directions are (up, 0, down, 1), got: " + str(_dir) ) # collect the nodes on the searchpath _path_nodes = [self.data,] _cur = self.data if _srch: for x in _srch: try: _cur = _cur[x] except (IndexError, KeyError, TypeError): raise YapyUtilsCapabilityOidError( "invalid search path: %s\n see: %s\n" %( str(_srch), str(x) ) ) _path_nodes.append(_cur) # revert for bottom-up search direction if not _dir: # upward - up | 0 | False _path_nodes = reversed(_path_nodes) # now search the subpath for each node of the search path # first match wins for _pn in _path_nodes: _cur = _pn _idx_subpath = 0 # reset here for x in subpath: _excep = False try: _cur = _cur[x] except (IndexError, KeyError, TypeError): # continue with next level - only when nodes do not fit _cur = None _excep = True break if not _excep: break # has hit a regular match if _excep: # no match raise YapyUtilsCapabilityOidError( "Missing subpath hook" "\n searchpath: %s" "\n subpath hook: %s" "\n subpath: %s\n" %( str(_srch), str(subpath[_idx_subpath - 1]), str(subpath) ) ) return _cur
# def __str__(self): # res = '' # return res
[docs] def create(self, *subpath, **kargs): """Creates a subpath to a given node, default is from top. Args: subpath: The list of keys constituting a branch of a data tree. The *subpath* is treated as a branch of one of the nodes of a provided *searchpath* - which is by default the top node. The supported values are:: subpath := <list-of-node-ids> <list-of-node-ids> := <node-id> [',' <list-of-node-ids>] node-id := ( str # strings as keys - objects/dict only | int # integers as index - lists only | tuple | list # set as choice of valid literal path items | <regexpr> ) regexpr := <compiled-python-regular-expression> compiled-python-regular-expression := re.compile(<regexpr>) regexpr := "regular expression" kargs: hook: Optional node as parent of the insertion point for the new sub path. The node must exist. :: hook := ( ) search path for the match of the provided address *subpath*. The provided *subpath* is applied to each node of the *searchpath* in accordance to the *direction* option. Default := <top-node> create: Create missing entities of the requested path. The provided value is the value of the final node:: create := <value-of-node> value-of-node := <valid-json-node-type> valid-json-node-type := ( int | float | str # unicode | dict | list | None | True | False # equivalent: null|true|false ) default := None direction: The search direction of the *subpath* within the *searchpath*. In case of multiple superpositioning attributes the first match of traversion. The provided values are:: direction := ( up | 0 | False # search from right-to-left | down | 1 | True # search from left-to-right ) Default:= up match: Sets the match criteria for the search operation. Interferes with *direction*:: match := ( M_FIRST # use first matching node | M_LAST # use last matching node | M_ALL # use all - iterate all matches ) default := Returns: In case of a match returns the tuple:: return := (<attr-value-path>, <attr-value>) attr-value-path := "the list of keys of the top-down path" attr-value := "value in accordance to the type of the attribute" Else raises *YapyUtilsCapabilityOidError*. Raises: YapyUtilsCapabilityOidError pass-through """ _srch = kargs.get('searchpath', ()) _dir = kargs.get('direction', 0) try: _create_value = kargs['create'] _create = True except KeyError: _create_value = None _create = False if not isinstance(_srch, (tuple, list,)): raise YapyUtilsCapabilityError( "searchpath requires 'tuple' or'list', got: " + str(_srch) ) if _dir in (True, False,): pass else: _dir = str(_dir).lower() if _dir in ('up', '0',): _dir = False elif _dir in ('down', '1',): _dir = True else: raise YapyUtilsCapabilityError( "valid directions are (up, 0, down, 1), got: " + str(_dir) ) # collect the nodes on the searchpath _path_nodes = [self.data,] _cur = self.data if _srch: for x in _srch: try: _cur = _cur[x] except (IndexError, KeyError, TypeError): raise YapyUtilsCapabilityOidError( "invalid searchpath: %s\n see: %s\n" %( str(_srch), str(x) ) ) _path_nodes.append(_cur) # revert for bottom-up search direction if not _dir: # upward - up | 0 | False _path_nodes = reversed(_path_nodes) # now search the subpath for each node of the search path # first match wins for _pn in _path_nodes: _cur = _pn _idx_subpath = 0 # reset here for x in subpath: _excep = False try: _cur = _cur[x] except (IndexError, KeyError, TypeError): # continue with next level - only when nodes do not fit _cur = None _excep = True break if not _excep: break # has hit a regular match if _excep: # no match raise YapyUtilsCapabilityOidError( "Missing subpath hook" "\n searchpath: %s" "\n subpath hook: %s" "\n subpath: %s\n" %( str(_srch), str(subpath[_idx_subpath - 1]), str(subpath) ) ) return _cur
[docs] def get(self, *key): """Gets a value from data. Args: Returns: Raises: """ try: return self(*key) except: return None
[docs] def addfile(self, fpname, key=None, node=None, **kargs): """Superposes a configuration file to the existing entries. Updates and creates granular items. The a applied traversing algorithm is:: 1. create non-existing branches and leafs 2. replace existing leafs 3. traverse existing leafs for new branches and existing leafs Args: fpname: see *readfile()* key: see *readfile()* node: see *readfile()* kargs: path: see *readfile()* striproot: see *readfile()* suffixes: see *readfile()* Returns: Reference to updated data structure. Raises: YapyUtilsConfigError pass-through """ jval = self.readfile(fpname, nodata=True, **kargs) _p = yapydata.datatree.synjson.DataTreeJSON(self.data) _p.join(jval) self.data = _p.data return self.data
[docs] def readfile(self, fpname, key=None, node=None, **kargs): """Reads a JSON file. This is a simple basic method for the application on the lower layers of the software stack. It is designed for minimal dependencies. The used standard libraries support the syntaxes:: INI, JSON, .properties, XML, YAML The data is by not validated. If this is required external higher layer tools such as *multiconf* could be applied. The *readfile()* simply replaces existing configuration data, for the iterative update see *addfile()*. Args: fpname: File path name of the configuration file. Alternate relative names could be provided, where optionally additional parameters are used for search and file suffixes, see other parameters:: <path>/<fpname>[<suffix>] key: The key for the insertion point:: node[key] = <file-data> default := None - replace self.data, The caller is responsible for the containment of the provided node within the data structure represented by this object. No checks are performed. node: The node for the insertion of the read data.:: default := <top> kargs: nodata: Prohibit insertion to *self.data*, returns the data reference only. :: default := False path: Alternate list of search paths:: path := <list-of-search-paths> list-of-search-paths := <spath> [, <list-of-search-paths>] spath := "path-prefix" striproot: A special option for *XML* files. When *True* suppresses the mandatory named root node of *XML*. Thus provides a similar result as *JSON* for better merge and internal processing. Other parsers simply ignore this option. suffixes: Suffixes as preferences for configuration file type:: suffixes := '[' <list-of-preferences> ']' list-of-preferences := <pref> [, <list-of-preferences>] pref := ( # the order defines the search and usage priority 'yaml' | 'json' | 'xml' | 'inix' | 'ini' | 'cfg' | 'properties' ) Returns: Reference to read data structure. Raises: YapyUtilsConfigError pass-through """ _nodata = kargs.get('nodata') _app = kargs.get('app', '') _suffixes = kargs.get('suffixes', SUFFIXES) _spath = kargs.get('path', _SPATH) _parser = None if not os.path.isfile(fpname): try: for s in _spath: for p in _suffixes: if p[0] != '.': p = '.' + p if os.path.isabs(fpname) and os.path.exists(fpname + p): datafile = os.path.abspath(fpname + p) _parser = p raise _FileFound elif os.path.isabs(fpname) and os.path.exists(fpname + 'capabilities' + p): datafile = os.path.abspath(fpname + os.sep + 'capabilities' + p) _parser = p raise _FileFound elif os.path.exists(s + os.sep + fpname + p): datafile = os.path.abspath(s + os.sep + fpname + p) _parser = p raise _FileFound elif os.path.exists(s + os.sep + fpname + os.sep + 'capabilities' + p): datafile = os.path.abspath(s + os.sep + fpname + os.sep + 'capabilities' + p) _parser = p raise _FileFound except _FileFound: pass else: raise YapyUtilsConfigError("Missing configuration file: " + str(fpname)) else: datafile = fpname if _parser == None: _parser = os.path.splitext(datafile)[1] if _parser == '.yaml': from yapydata.datatree.synyaml import DataTreeYAML as Parser # @UnusedImport elif _parser == '.xml': from yapydata.datatree.synxml import DataTreeXML as Parser # @UnusedImport @Reimport elif _parser == '.json': from yapydata.datatree.synjson import DataTreeJSON as Parser # @UnusedImport @Reimport elif _parser in ('.ini', '.inix', '.cfg', '.conf', ): from yapydata.datatree.synini import DataTreeINI as Parser # @Reimport elif _parser in ('.properties', ): raise NotImplemented("not yet available") jval = Parser().import_data(datafile, key, node, **kargs) if key and node == None: raise YapyUtilsConfigError("Given key(%s) requires a valid node." % (str(key))) if key: node[key] = jval elif not _nodata: self.data = jval return jval