Skip to content

Utilities API

Logging, unit handling, and data utility functions.

Logging

hera.utils.logging

Unit Handler

hera.utils.unitHandler

Unit handling module for Hera.

Pint is the preferred unit system. Use ureg for all new code. Unum is supported as a fallback for backward compatibility with older code.

Usage::

from hera.utils.unitHandler import ureg, Quantity

speed = 5.0 * ureg.m / ureg.s
area = 10 * ureg.dunam          # custom: 1 dunam = 1000 m²

tonumber(x, theunit)

Convert a value with units to a plain number in the given unit.

Parameters:

Name Type Description Default
x Unum, Quantity, or numeric

The value to convert.

required
theunit unit

The target unit.

required

Returns:

Type Description
float
Source code in hera/utils/unitHandler.py
def tonumber(x, theunit):
    """
    Convert a value with units to a plain number in the given unit.

    Parameters
    ----------
    x : Unum, Quantity, or numeric
        The value to convert.
    theunit : unit
        The target unit.

    Returns
    -------
    float
    """
    if isinstance(x, Unum) and unumSupport:
        ret = x.asNumber(theunit)
    elif isinstance(x, Quantity):
        ret = x.to(theunit).magnitude
    else:
        ret = x
    return ret

tounit(x, theunit)

Convert a value to the given unit, handling both pint and unum.

Parameters:

Name Type Description Default
x Unum, Quantity, or numeric

The value to convert.

required
theunit Unit, Unum, or str

The target unit.

required

Returns:

Type Description
Quantity or Unum
Source code in hera/utils/unitHandler.py
def tounit(x, theunit):
    """
    Convert a value to the given unit, handling both pint and unum.

    Parameters
    ----------
    x : Unum, Quantity, or numeric
        The value to convert.
    theunit : Unit, Unum, or str
        The target unit.

    Returns
    -------
    Quantity or Unum
    """
    logger = get_logger(None, "hera.utils.tounit")
    if isinstance(x, Unum) and unumSupport:
        logger.warning("Please prefer using pint (ureg) for units")
        if isinstance(theunit, Unit):
            ret = unumToPint(x).to(theunit)
        elif isinstance(theunit, Unum):
            ret = x.asUnit(theunit)
        else:
            logger.error(f"can't convert {x} to units {theunit}")
            ret = x
    elif isinstance(x, Quantity):
        if isinstance(theunit, Unum) and unumSupport:
            logger.warning("Please prefer using pint (ureg) for units")
            ret = pintToUnum(x).asUnit(theunit)
        elif isinstance(theunit, (Unit, str)):
            ret = x.to(theunit)
        else:
            logger.error(f"can't convert {x} to units {theunit}")
            ret = x
    else:
        ret = Quantity(x, theunit)
    return ret

convert_unum_units_to_eval_str(unit_str)

Convert a unum-style unit string to an eval-safe expression.

Source code in hera/utils/unitHandler.py
@deprecated(reason="Doesn't work for some cases. Move to Pint")
def convert_unum_units_to_eval_str(unit_str):
    """Convert a unum-style unit string to an eval-safe expression."""
    def replace_exponents(match):
        """Replace unit-exponent pairs with Python power syntax."""
        unit = match.group(1)
        exponent = match.group(2)
        return f"{unit}**{exponent}"
    pattern = re.compile(r'([a-zA-Z]+)(-?\d+)')
    unit_str = re.sub(pattern, replace_exponents, unit_str)
    return unit_str

unumToStr(obj)

Convert a Unum object to a string representation.

Source code in hera/utils/unitHandler.py
@deprecated(reason="Doesn't work for some cases")
def unumToStr(obj):
    """Convert a Unum object to a string representation."""
    if isinstance(obj, Unum):
        objStr = convert_unum_units_to_eval_str(str(obj))
        ret = objStr.replace(" [", "*").replace("]", "")
    else:
        ret = str(obj)
    return ret

strToUnum(value)

Convert a string to a Unum object.

Source code in hera/utils/unitHandler.py
@deprecated(reason="Doesn't work for some cases")
def strToUnum(value):
    """Convert a string to a Unum object."""
    if isinstance(value, Unum):
        ret = value
    else:
        try:
            ret = eval(str(value))
        except:
            ret = value
    return ret

extractUnumUnitsFromPint(pint_quantity)

Extract unum unit equivalent from a pint Quantity.

Source code in hera/utils/unitHandler.py
def extractUnumUnitsFromPint(pint_quantity):
    """Extract unum unit equivalent from a pint Quantity."""
    units = pint_quantity._units
    unum_unit = 1 * _m / _m  # unitless
    for unit_name, power in units.items():
        if unit_name not in PINT_TO_UNUM_MAP:
            raise ValueError(f"Unit '{unit_name}' not mapped to Unum.")
        unum_unit *= PINT_TO_UNUM_MAP[unit_name] ** power
    return unum_unit

pintToUnum(pint_quantity)

Convert a pint Quantity to a Unum object.

Parameters:

Name Type Description Default
pint_quantity Quantity

The pint value to convert.

required

Returns:

Type Description
Unum
Source code in hera/utils/unitHandler.py
def pintToUnum(pint_quantity):
    """
    Convert a pint Quantity to a Unum object.

    Parameters
    ----------
    pint_quantity : Quantity
        The pint value to convert.

    Returns
    -------
    Unum
    """
    magnitude = pint_quantity.magnitude
    units = pint_quantity._units
    unum_unit = 1 * _m / _m
    for unit_name, power in units.items():
        if unit_name not in PINT_TO_UNUM_MAP:
            raise ValueError(f"Unit '{unit_name}' not mapped to Unum.")
        unum_unit *= PINT_TO_UNUM_MAP[unit_name] ** power
    return magnitude * unum_unit

unumToPint(unum_obj, value=1.0)

Convert a Unum object to a pint Quantity.

Parameters:

Name Type Description Default
unum_obj Unum

The unum value to convert.

required
value float

Optional numerical value.

1.0

Returns:

Type Description
Quantity
Source code in hera/utils/unitHandler.py
def unumToPint(unum_obj, value=1.0):
    """
    Convert a Unum object to a pint Quantity.

    Parameters
    ----------
    unum_obj : Unum
        The unum value to convert.
    value : float
        Optional numerical value.

    Returns
    -------
    Quantity
    """
    if isinstance(unum_obj, Quantity):
        return unum_obj
    unit_str = str(unum_obj)
    pint_str = convert_unum_units_to_eval_str(unit_str)
    return value * ureg.parse_expression(pint_str)

unumToBaseUnits(unum_obj)

Convert a Unum object to MKS base units.

Source code in hera/utils/unitHandler.py
def unumToBaseUnits(unum_obj):
    """Convert a Unum object to MKS base units."""
    pint_obj = unumToPint(unum_obj)
    standardize = pint_obj.to_base_units()
    return pintToUnum(standardize)

JSON Utilities

hera.utils.jsonutils

Unum

Placeholder when unum is not installed.

Source code in hera/utils/unitHandler.py
class Unum:
    """Placeholder when unum is not installed."""
    pass

JSONvariationItem

An iterator that creates the variations of a single parameter block.

Each group is a list of parameters. { p1 : list of values p2 : list of values

}

where p1, p2 are json paths. (If $. is not specified in the path, add them).

Source code in hera/utils/jsonutils.py
class JSONvariationItem:
    """
        An iterator that creates the variations of a single parameter block.

        Each group is a list of parameters.
        {
            p1 : list of values
            p2 :  list of values

        }

        where p1, p2 are json paths. (If $. is not specified in the path, add them).
    """

    base = None
    variationItem = None

    _curIter = None # the ID of the
    _itemCount = None

    def __init__(self,base,variationItem,convetToBaseUnits=True):
        """
            The base is the json that will be changes.

            variationItem is a map of json paths -> values. All the paths change together.
        Parameters
        ----------
        base
        variationItem
        convetToBaseUnits: bool
            If true, and the item is pint.Quantity, then convety to base (mks) units.
        """
        logger = get_classMethod_logger(self,name="init")

        self.base = base
        self._itemCount = None
        for key,value in variationItem.items():
            if self._itemCount is None:
                self._itemCount = len(value)
                logger.debug(f"Got {self._itemCount} items in key {key}. Now have to make sure that it equal to all keys ")
            else:
                if len(value) != self._itemCount:
                    err = f"The key {key} does not have the right number of values. Got {len(value)}, and should be {self._itemCount}"
                    logger.error(err)
                    raise ValueError(err)

        self.variationItem = variationItem
        self._curIter = 0


    def __iter__(self):
        """Return the iterator object (self)."""
        return self

    def __next__(self):
        """Return the next variation mapping or raise StopIteration."""
        if self._curIter > self._itemCount-1:
            raise StopIteration

        result = {}
        for key, value in self.variationItem.items():
            result[key] = value[self._curIter]
        self._curIter += 1
        return result
__init__(base, variationItem, convetToBaseUnits=True)
The base is the json that will be changes.

variationItem is a map of json paths -> values. All the paths change together.

Parameters:

Name Type Description Default
base
required
variationItem
required
convetToBaseUnits

If true, and the item is pint.Quantity, then convety to base (mks) units.

True
Source code in hera/utils/jsonutils.py
def __init__(self,base,variationItem,convetToBaseUnits=True):
    """
        The base is the json that will be changes.

        variationItem is a map of json paths -> values. All the paths change together.
    Parameters
    ----------
    base
    variationItem
    convetToBaseUnits: bool
        If true, and the item is pint.Quantity, then convety to base (mks) units.
    """
    logger = get_classMethod_logger(self,name="init")

    self.base = base
    self._itemCount = None
    for key,value in variationItem.items():
        if self._itemCount is None:
            self._itemCount = len(value)
            logger.debug(f"Got {self._itemCount} items in key {key}. Now have to make sure that it equal to all keys ")
        else:
            if len(value) != self._itemCount:
                err = f"The key {key} does not have the right number of values. Got {len(value)}, and should be {self._itemCount}"
                logger.error(err)
                raise ValueError(err)

    self.variationItem = variationItem
    self._curIter = 0
__iter__()

Return the iterator object (self).

Source code in hera/utils/jsonutils.py
def __iter__(self):
    """Return the iterator object (self)."""
    return self
__next__()

Return the next variation mapping or raise StopIteration.

Source code in hera/utils/jsonutils.py
def __next__(self):
    """Return the next variation mapping or raise StopIteration."""
    if self._curIter > self._itemCount-1:
        raise StopIteration

    result = {}
    for key, value in self.variationItem.items():
        result[key] = value[self._curIter]
    self._curIter += 1
    return result

get_logger(instance, name=None)

Return a class logger for the instance, or a named logger if name is given.

Source code in hera/utils/logging/helpers.py
def get_logger(instance, name=None):
    """Return a class logger for the instance, or a named logger if name is given."""
    return getClassLogger(instance.__class__) if name is None else logging.getLogger(name)

tonumber(x, theunit)

Convert a value with units to a plain number in the given unit.

Parameters:

Name Type Description Default
x Unum, Quantity, or numeric

The value to convert.

required
theunit unit

The target unit.

required

Returns:

Type Description
float
Source code in hera/utils/unitHandler.py
def tonumber(x, theunit):
    """
    Convert a value with units to a plain number in the given unit.

    Parameters
    ----------
    x : Unum, Quantity, or numeric
        The value to convert.
    theunit : unit
        The target unit.

    Returns
    -------
    float
    """
    if isinstance(x, Unum) and unumSupport:
        ret = x.asNumber(theunit)
    elif isinstance(x, Quantity):
        ret = x.to(theunit).magnitude
    else:
        ret = x
    return ret

tounit(x, theunit)

Convert a value to the given unit, handling both pint and unum.

Parameters:

Name Type Description Default
x Unum, Quantity, or numeric

The value to convert.

required
theunit Unit, Unum, or str

The target unit.

required

Returns:

Type Description
Quantity or Unum
Source code in hera/utils/unitHandler.py
def tounit(x, theunit):
    """
    Convert a value to the given unit, handling both pint and unum.

    Parameters
    ----------
    x : Unum, Quantity, or numeric
        The value to convert.
    theunit : Unit, Unum, or str
        The target unit.

    Returns
    -------
    Quantity or Unum
    """
    logger = get_logger(None, "hera.utils.tounit")
    if isinstance(x, Unum) and unumSupport:
        logger.warning("Please prefer using pint (ureg) for units")
        if isinstance(theunit, Unit):
            ret = unumToPint(x).to(theunit)
        elif isinstance(theunit, Unum):
            ret = x.asUnit(theunit)
        else:
            logger.error(f"can't convert {x} to units {theunit}")
            ret = x
    elif isinstance(x, Quantity):
        if isinstance(theunit, Unum) and unumSupport:
            logger.warning("Please prefer using pint (ureg) for units")
            ret = pintToUnum(x).asUnit(theunit)
        elif isinstance(theunit, (Unit, str)):
            ret = x.to(theunit)
        else:
            logger.error(f"can't convert {x} to units {theunit}")
            ret = x
    else:
        ret = Quantity(x, theunit)
    return ret

convert_unum_units_to_eval_str(unit_str)

Convert a unum-style unit string to an eval-safe expression.

Source code in hera/utils/unitHandler.py
@deprecated(reason="Doesn't work for some cases. Move to Pint")
def convert_unum_units_to_eval_str(unit_str):
    """Convert a unum-style unit string to an eval-safe expression."""
    def replace_exponents(match):
        """Replace unit-exponent pairs with Python power syntax."""
        unit = match.group(1)
        exponent = match.group(2)
        return f"{unit}**{exponent}"
    pattern = re.compile(r'([a-zA-Z]+)(-?\d+)')
    unit_str = re.sub(pattern, replace_exponents, unit_str)
    return unit_str

unumToStr(obj)

Convert a Unum object to a string representation.

Source code in hera/utils/unitHandler.py
@deprecated(reason="Doesn't work for some cases")
def unumToStr(obj):
    """Convert a Unum object to a string representation."""
    if isinstance(obj, Unum):
        objStr = convert_unum_units_to_eval_str(str(obj))
        ret = objStr.replace(" [", "*").replace("]", "")
    else:
        ret = str(obj)
    return ret

strToUnum(value)

Convert a string to a Unum object.

Source code in hera/utils/unitHandler.py
@deprecated(reason="Doesn't work for some cases")
def strToUnum(value):
    """Convert a string to a Unum object."""
    if isinstance(value, Unum):
        ret = value
    else:
        try:
            ret = eval(str(value))
        except:
            ret = value
    return ret

extractUnumUnitsFromPint(pint_quantity)

Extract unum unit equivalent from a pint Quantity.

Source code in hera/utils/unitHandler.py
def extractUnumUnitsFromPint(pint_quantity):
    """Extract unum unit equivalent from a pint Quantity."""
    units = pint_quantity._units
    unum_unit = 1 * _m / _m  # unitless
    for unit_name, power in units.items():
        if unit_name not in PINT_TO_UNUM_MAP:
            raise ValueError(f"Unit '{unit_name}' not mapped to Unum.")
        unum_unit *= PINT_TO_UNUM_MAP[unit_name] ** power
    return unum_unit

pintToUnum(pint_quantity)

Convert a pint Quantity to a Unum object.

Parameters:

Name Type Description Default
pint_quantity Quantity

The pint value to convert.

required

Returns:

Type Description
Unum
Source code in hera/utils/unitHandler.py
def pintToUnum(pint_quantity):
    """
    Convert a pint Quantity to a Unum object.

    Parameters
    ----------
    pint_quantity : Quantity
        The pint value to convert.

    Returns
    -------
    Unum
    """
    magnitude = pint_quantity.magnitude
    units = pint_quantity._units
    unum_unit = 1 * _m / _m
    for unit_name, power in units.items():
        if unit_name not in PINT_TO_UNUM_MAP:
            raise ValueError(f"Unit '{unit_name}' not mapped to Unum.")
        unum_unit *= PINT_TO_UNUM_MAP[unit_name] ** power
    return magnitude * unum_unit

unumToPint(unum_obj, value=1.0)

Convert a Unum object to a pint Quantity.

Parameters:

Name Type Description Default
unum_obj Unum

The unum value to convert.

required
value float

Optional numerical value.

1.0

Returns:

Type Description
Quantity
Source code in hera/utils/unitHandler.py
def unumToPint(unum_obj, value=1.0):
    """
    Convert a Unum object to a pint Quantity.

    Parameters
    ----------
    unum_obj : Unum
        The unum value to convert.
    value : float
        Optional numerical value.

    Returns
    -------
    Quantity
    """
    if isinstance(unum_obj, Quantity):
        return unum_obj
    unit_str = str(unum_obj)
    pint_str = convert_unum_units_to_eval_str(unit_str)
    return value * ureg.parse_expression(pint_str)

unumToBaseUnits(unum_obj)

Convert a Unum object to MKS base units.

Source code in hera/utils/unitHandler.py
def unumToBaseUnits(unum_obj):
    """Convert a Unum object to MKS base units."""
    pint_obj = unumToPint(unum_obj)
    standardize = pint_obj.to_base_units()
    return pintToUnum(standardize)

compareJSONS(longFormat=False, changeDotToUnderscore=False, **kwargs)

Recieves {:} mapping in kwargs and compares the values of all the common fields. Returns pandas df of the result.

Returns:

Type Description
DataFrame: df of the comparison results
Source code in hera/utils/jsonutils.py
def compareJSONS(longFormat=False,changeDotToUnderscore=False,**kwargs):
    """Recieves {<name>:<json>} mapping in kwargs and compares the values of all the common fields. Returns pandas df of the result.

    Parameters
    ----------
        longFormat (bool, optional): Return in long/wide format. Defaults to False.
        changeDotToUnderscore (bool, optional): instead of showing fields as x.y.z uses x_y_z. This allows using pandas.query function. Defaults to False.

    Returns
    -------
        DataFrame: df of the comparison results
    """
    fulldata = pandas.concat([convertJSONtoPandas(data).assign(datasetName=name) for name,data in kwargs.items()])
    return compareDataframeConfigurations(fulldata,datasetName="datasetName",parameterName="parameterNameFullPath",longFormat=longFormat,changeDotToUnderscore=changeDotToUnderscore)

ConfigurationToJSON(valueToProcess, standardize=False, splitUnits=False, keepOriginalUnits=True)

Converts a configuration dict (that might include unum objects) to JSON dict (where all the values are strings). The unum objects are converted to Str in a way that allows for their retrieval. (see the JSONToConfiguration function)

Parameters:

Name Type Description Default
conf dict

A key-value dict where the values are string. Converts unum objects to string like representations: 1m/s --> '1m/s'.

required
standardize bool

If true, converts the units to MKS.

False
splitUnits bool

If true splits the unum object to number and units list. The units are [MKS units,original units]. The units are saved in field _units if keepUnits is True.

The JSONToConfiguration locates the _units and assembles them. This allows the user to query the data using the __lt and __lg and maintain the right units of the comparison.

False
keepOriginalUnits bool

If true, and the splitUnits is true then saved in _units if keepUnits is True. If false, then the units are not saved. This is used in the getDocuments to build the query dict. There, we don't need the units in the query.

True

Returns:

Type Description
dict with all the values as string
Source code in hera/utils/jsonutils.py
def ConfigurationToJSON(valueToProcess, standardize=False, splitUnits=False, keepOriginalUnits=True):
    """Converts a configuration dict (that might include unum objects) to JSON dict (where all the values are strings).
    The unum objects are converted to Str in a way that allows for their retrieval. (see the JSONToConfiguration function)

    Parameters
    ----------
    conf : dict
        A key-value dict where the values are string.
        Converts unum objects to string like representations:
            1*m/s --> '1*m/s'.

    standardize : bool
        If true, converts the units to MKS.

    splitUnits : bool
        If true splits the unum object to number and units list.
        The units are [MKS units,original units].
        The units are saved in field <field name>_units if keepUnits is True.

        The JSONToConfiguration locates the <filed names>_units and assembles them.
        This allows the user to query the data using the __lt and __lg and maintain the
        right units of the comparison.

    keepOriginalUnits : bool
        If true, and the splitUnits is true then saved in <field name>_units if keepUnits is True.
        If false, then the units are not saved. This is used in the getDocuments to build the query
        dict. There, we don't need the units in the query.

    Returns
    -------
        dict with all the values as string

    """
    logger = logging.getLogger("hera.bin.ConfigurationToJSON")
    ret = {}
    logger.debug(f"Processing {valueToProcess}")
    if isinstance(valueToProcess,dict):

        for key, value in valueToProcess.items():
            logger.debug("\t dictionary, calling recursively")
            ret[key] = ConfigurationToJSON(value, standardize=standardize, splitUnits=splitUnits,
                                           keepOriginalUnits=keepOriginalUnits)
    elif isinstance(valueToProcess,list):
        logger.debug("\t list, transforming to str every item")
        ret = [ConfigurationToJSON(x, standardize=standardize, splitUnits=splitUnits,
                                   keepOriginalUnits=keepOriginalUnits) for x in valueToProcess]
    elif "'" in str(valueToProcess):
        logger.debug(f"\t {valueToProcess} is String, use as is")
        ret = valueToProcess
    elif isinstance(valueToProcess,Unum):

        logger.debug(f"\t Try to convert *{valueToProcess}* to string")

        origPintObj = unumToPint(valueToProcess)
        pintObj = origPintObj.to_base_units() if standardize else origPintObj

        if splitUnits:
            ret = dict(magnitude=pintObj.magnitude)
            ret['units'] = str(pintObj)

            if keepOriginalUnits:
                ret['value'] = str(origPintObj)
        else:
            ret =  str(pintObj)

    elif isinstance(valueToProcess,Quantity):
        origPintObj = valueToProcess
        pintObj = origPintObj.to_base_units() if standardize else origPintObj

        if splitUnits:
            ret = dict(magnitude=pintObj.magnitude)
            ret['units'] = str(pintObj)
            if keepOriginalUnits:
                ret['value'] = str(origPintObj)
        else:
            ret = str(pintObj)

    else:
        ret = valueToProcess

    return ret

JSONToConfiguration(valueToProcess, returnUnum=False, returnStandardize=False)

Converts a dictionary (all the values are string) to a JSON where all the values are string. Convert the JSON back to configuration object using the ConfigurationToJSON function.

Parameters:

Name Type Description Default
JSON dict

A key-value where all the values are strings. The unum objects has the format '1' (for exampe '1m/s')

required
returnUnum bool [Default True]

If true convert a quantity to Unum.

False
returnStandardize

If true, return the units in MKS. If False return the original units.

False

Returns:

Type Description
A dict with the values convected to unum.
Source code in hera/utils/jsonutils.py
def JSONToConfiguration(valueToProcess,returnUnum=False,returnStandardize=False):
    """Converts a dictionary (all the values are string) to a JSON where all the values are string.
    Convert the JSON back to configuration object using  the ConfigurationToJSON function.

    Parameters
    ----------
    JSON : dict
        A key-value where all the values are strings.
        The unum objects has the format '1*<unit>' (for exampe '1*m/s')

    returnUnum : bool [Default True]
        If true convert a quantity to Unum.

    returnStandardize: bool [Default False]
        If true, return the  units in MKS. If False return the original units.

    Returns
    -------
        A dict with the values convected to unum.

    """
    logger = logging.getLogger("hera.util.JSONToConfiguration")

    logger.debug(f"Processing {valueToProcess} of type {type(valueToProcess)}")
    if isinstance(valueToProcess,dict):
        if ('magnitude' in valueToProcess) and ('units' in valueToProcess) and ('value' in valueToProcess) and len(valueToProcess)==3:
            ret = ureg(valueToProcess['value']) if not returnStandardize else ureg(valueToProcess['units'])
            ret = pintToUnum(ret) if returnUnum else ret
        else:
            ret = {}
            for key, value in valueToProcess.items():
                logger.debug(f"Transforming key: {key}")
                ret[key] = JSONToConfiguration(value,returnUnum=returnUnum,returnStandardize=returnStandardize)
    elif isinstance(valueToProcess,list):
        logger.debug("\t list, transforming to unum every item")
        ret = [JSONToConfiguration(x,returnUnum=returnUnum,returnStandardize=returnStandardize) for x in valueToProcess]
    elif isinstance(valueToProcess,Unum):
        ret = valueToProcess if returnUnum else unumToPint(valueToProcess)
    elif isinstance(valueToProcess,Quantity):
        ret = pintToUnum(valueToProcess) if returnUnum else valueToProcess
    elif isinstance(valueToProcess,int):
        ret = valueToProcess
    elif isinstance(valueToProcess,float):
        ret = valueToProcess
    elif "'" in str(valueToProcess):
        logger.debug(f"\t {valueToProcess} is a String, use as is")
        ret = valueToProcess
    else:
        logger.debug(f"\t Try to convert {valueToProcess} to unum")
        try:
            ret = ureg(valueToProcess)
            ret = pintToUnum(ret) if returnUnum else ret
        except (UndefinedUnitError, DimensionalityError, ValueError):
            ret = valueToProcess

    return ret

stripConfigurationUnits(valueToProcess, returnStandardize=False, ignoreStandardization=[])

Converts a dictionary to a JSON where all the values with Unum or Pint units get converted to their magnitude

Parameters:

Name Type Description Default
JSON dict

The JSON to strip the units from

required
returnStandardize

If true, return the units in MKS. If False return the original units.

False

Returns:

Type Description
Same json with all units stripped leaving just magnitudes
Source code in hera/utils/jsonutils.py
def stripConfigurationUnits(valueToProcess,returnStandardize=False, ignoreStandardization=[]):
    """Converts a dictionary to a JSON where all the values with Unum or Pint units get converted to their magnitude

    Parameters
    ----------
    JSON : dict
        The JSON to strip the units from

    returnStandardize: bool [Default False]
        If true, return the  units in MKS. If False return the original units.

    Returns
    -------
        Same json with all units stripped leaving just magnitudes

    """
    logger = logging.getLogger("hera.utils.stripConfigurationUnits")
    ret = {}
    logger.debug(f"Processing {valueToProcess}")
    if isinstance(valueToProcess,dict):
        for key, value in valueToProcess.items():
            logger.debug("\t dictionary, calling recursively")
            ret[key] = stripConfigurationUnits(value,returnStandardize=(returnStandardize and key not in ignoreStandardization),
                                               ignoreStandardization=ignoreStandardization)
    elif isinstance(valueToProcess,list):
        logger.debug("\t list, transforming to str every item")
        ret = [stripConfigurationUnits(x,returnStandardize=returnStandardize,
                                       ignoreStandardization=ignoreStandardization) for x in valueToProcess]
    elif isinstance(valueToProcess,Unum):
        logger.debug(f"\t Try to convert *{valueToProcess}* to string")
        origPintObj = unumToPint(valueToProcess)
        pintObj = origPintObj.to_base_units() if (returnStandardize) else origPintObj
        ret = pintObj.magnitude
    elif isinstance(valueToProcess,Quantity):
        origPintObj = valueToProcess
        pintObj = origPintObj.to_base_units() if returnStandardize else origPintObj
        ret = pintObj.magnitude
    else:
        ret = valueToProcess

    return ret

loadJSON(jsonData)

Reads the json object to the memory. Could be: * file object: any file-like object with the property 'read'. * str: either the JSON or a path to the directory. * dict: the JSON object.

Parameters:

Name Type Description Default
jsonData str, object file, path to disk, dict

The object that contains the dict.

required

Returns:

Type Description
dict

The loaded JSON.

Source code in hera/utils/jsonutils.py
def loadJSON(jsonData):
    """Reads the json object to the memory.
    Could be:
    * file object: any file-like object with the property 'read'.
    * str: either the JSON or a path to the directory.
    * dict: the JSON object.

    Parameters
    ----------
    jsonData : str, object file, path to disk, dict
        The object that contains the dict.

    Returns
    -------
        dict
        The loaded JSON.

    """

    if hasattr(jsonData, 'read'):
        loadedjson = json.load(jsonData)
    elif isinstance(jsonData, str):
        if os.path.exists(jsonData):
            with open(jsonData) as jsonFile:
                loadedjson = json.load(jsonFile)
        else:
            try:
                loadedjson = json.loads(jsonData.replace("'",'"').replace("True","true").replace("False","false").replace("None","null"))
            except JSONDecodeError as e:
                raise ValueError(f" {str(e)}: Got {jsonData}, either bad format of file does not exist")

    elif isinstance(jsonData, dict):
        loadedjson = jsonData
    elif isinstance(jsonData, list):
        loadedjson = jsonData
    else:
        err = f"workflow type: {type(jsonData)} is unknown. Must be str, file-like or dict. "
        raise ValueError(err)


    return  loadedjson

processJSONToPandas(jsonData, nameColumn='parameterName', valueColumn='value')

Trnasforms a JSON to pandas, flattens a list items and names them according to their order

The example the JSON :
{
  "nodes": {
      "a" : {
        "x" : 1,
        "y" : 2,
        "z" : 3
      },
       "b" : {
        "r" : 1,
        "t" : 2,
        "y" : [3,2,4,5,6]
      }
  }
}

will be converted to
parameterName  value

0 nodes.a.x 1 1 nodes.a.y 2 2 nodes.a.z 3 3 nodes.b.r 1 4 nodes.b.t 2 5 nodes.b.y_0 3 6 nodes.b.y_1 2 7 nodes.b.y_2 4 8 nodes.b.y_3 5 9 nodes.b.y_4 6

Notes:

    - Currently does not support JSON whose root is a list.
      [
        {  "a" : 1 },
        {  "b" : 2}

      ]
        It will be supported if needed in the future.
Parameters
jsonData : dict
    the JSON data (a dict)

nameColumn: str
    The name of the parameter column name

valueColumn : str
    The name of the value
Source code in hera/utils/jsonutils.py
def processJSONToPandas(jsonData, nameColumn="parameterName", valueColumn="value"):
    """
        Trnasforms a JSON to pandas, flattens a list items and names them according to their order

        The example the JSON :
        {
          "nodes": {
              "a" : {
                "x" : 1,
                "y" : 2,
                "z" : 3
              },
               "b" : {
                "r" : 1,
                "t" : 2,
                "y" : [3,2,4,5,6]
              }
          }
        }

        will be converted to

        parameterName  value
        --------------------
    0     nodes.a.x     1
    1     nodes.a.y     2
    2     nodes.a.z     3
    3     nodes.b.r     1
    4     nodes.b.t     2
    5   nodes.b.y_0     3
    6   nodes.b.y_1     2
    7   nodes.b.y_2     4
    8   nodes.b.y_3     5
    9   nodes.b.y_4     6

        Notes:

            - Currently does not support JSON whose root is a list.
              [
                {  "a" : 1 },
                {  "b" : 2}

              ]
                It will be supported if needed in the future.

        Parameters
        ----------

        jsonData : dict
            the JSON data (a dict)

        nameColumn: str
            The name of the parameter column name

        valueColumn : str
            The name of the value

    """
    pnds = pandas.json_normalize(jsonData).T.reset_index().rename(columns={'index': nameColumn, 0: valueColumn})\
        .explode(valueColumn,ignore_index=True)\
        .reset_index()
    pnds[nameColumn] = pnds[nameColumn].astype(str)

    # Handles nested lists. keep on exploding until all is flat!.
    while True:
        listParameters = pnds.groupby(nameColumn).count().query(f"{valueColumn}>1").index
        for pname in listParameters:
            counter = 0
            I = pnds.apply(lambda x: x[nameColumn]==pname,axis=1)
            for indx, dta in pnds[I].iterrows():
                    pnds.loc[indx, nameColumn] = f"{pname}_{counter}"
                    counter += 1

        # Handling dicts. (that were inside list, and therefore were not exploded).
        # So now we need:
        # 1. find all the dict lines
        # 2. make a new data frame from each key,
        # 3. add the path of the father
        # 4. remove the old value
        # 5. concat all the new pandas.
        pandasAfterDictExpansion = []
        I = pnds.apply(lambda x: isinstance(x[valueColumn],dict),axis=1)
        for indx,data in pnds[I].iterrows():
            fatherPath = pnds.loc[indx][nameColumn]
            mapDataframe = pandas.json_normalize(pnds.loc[indx][valueColumn]).T.reset_index().rename(columns={'index': nameColumn, 0: valueColumn})
            for newIndx,newData in mapDataframe.iterrows():
                currentName = newData[nameColumn]
                mapDataframe.loc[newIndx, nameColumn] = f"{fatherPath}_{currentName}"

            pandasAfterDictExpansion.append(mapDataframe)

        pnds = pnds.drop(pnds[I].index)
        pnds = pandas.concat([pnds]+pandasAfterDictExpansion,ignore_index=True).drop("index",axis=1).reset_index()

        # Handles lists with 1 item.
        for I, dta in pnds.iterrows():
            if isinstance(dta[valueColumn],list):
                if len(dta[valueColumn]) ==1:
                    pnds.loc[I, nameColumn] = f"{pnds.loc[I][nameColumn]}_{0}"
                    pnds.at[I, valueColumn] = pnds.loc[I][valueColumn][0]

        # Handling nested lists.
        tmp = pnds.explode(valueColumn,ignore_index=True)
        if len(tmp) == len(pnds):
            break
        else:
            pnds = tmp



    return pnds[[nameColumn,valueColumn]]

convertJSONtoPandas(jsonData, nameColumn='parameterNameFullPath', valueColumn='value')

converts a JSON (either in file or loaded, or json str) to pandas. The pandas flattens the JSON using the json path convection. e.g { "a" : { "b" : 1, "c" : [1,2,3] } } will be converted to a.b 1 a.c_0 1 a.c_1 2 a.c_2 3 Does not support (currently) JSON whose root is a list but only supports dict

Parameters:

Name Type Description Default
jsonData (str, dict)

A json data either a file name, a json dict string, or a dict.

required
nameColumn

The name of the parameter column name

'parameterNameFullPath'
valueColumn str

The name of the value

'value'

Returns:

Type Description
pandas.DataFrame

with the fields nameColumn (the path of the json) and valueColumn

Source code in hera/utils/jsonutils.py
def convertJSONtoPandas(jsonData, nameColumn="parameterNameFullPath", valueColumn="value"):
    """converts a JSON (either in file or loaded, or json str) to pandas.
    The pandas flattens the JSON using the json path convection.
    e.g
    {
        "a" : {
            "b" : 1,
            "c" : [1,2,3]
        }
    }
    will be converted to
        a.b  1
        a.c_0 1
        a.c_1 2
        a.c_2 3
    Does not support (currently) JSON whose root is a list but only supports dict

    Parameters
    ----------
    jsonData : str,dict
        A json data either a file name, a json dict string, or a dict.


    nameColumn: str
        The name of the parameter column name

    valueColumn : str
        The name of the value

    Returns
    -------
        pandas.DataFrame
        with the fields nameColumn (the path of the json) and valueColumn
    """
    param1 =  loadJSON(jsonData)
    pnds1 = processJSONToPandas(param1,nameColumn=nameColumn,valueColumn=valueColumn)
    dictIndex = pnds1.apply(lambda x: isinstance(x.value,dict),axis=1)
    while dictIndex.sum()>0:
        base = [pnds1[~dictIndex]]

        toProcessList = pnds1[dictIndex].set_index("parameterName")[['value']]
        for pname,data in toProcessList.iterrows():
            newdata = processJSONToPandas(data.value,nameColumn=nameColumn,valueColumn=valueColumn)
            newdata = newdata.assign(parameterNameFullPath=newdata.parameterName.apply(lambda x: f"{pname}.{x}"))
            base.append(newdata)

        pnds1 = pandas.concat(base,ignore_index=True)
        dictIndex = pnds1.apply(lambda x: isinstance(x.value, dict), axis=1)

    return pnds1

setJSONPath(base, valuesDict, inPlace=False)

Sets the variables in the values dict to the base.
That is the base is a regular JSON file:
{
    "a" : {
        "b" : 1
    },
    "c" : {
        "d" : 2
    }
}

And the values dict is a path-> value.
{
    "a.b" : 2
}

so it will return
{
    "a" : {
        "b" : 2
    },
    "c" : {
        "d" : 2
    }
}

Parameters:

Name Type Description Default
base
required
valuesDict
required
inPlace bool

If false create a copy, else change the input.

False
Source code in hera/utils/jsonutils.py
def setJSONPath(base,valuesDict,inPlace=False):
    """
        Sets the variables in the values dict to the base.
        That is the base is a regular JSON file:
        {
            "a" : {
                "b" : 1
            },
            "c" : {
                "d" : 2
            }
        }

        And the values dict is a path-> value.
        {
            "a.b" : 2
        }

        so it will return
        {
            "a" : {
                "b" : 2
            },
            "c" : {
                "d" : 2
            }
        }

    Parameters
    ----------
    base
    valuesDict
    inPlace : bool
        If false create a copy, else change the input.

    Returns
    -------

    """
    base_local = base if inPlace else copy.deepcopy(base)

    for key, value in valuesDict.items():
            curkey = key if key.startswith("$.") else f"$.{key}"
            jsonpath_expr = parse(curkey)
            match = [match for match in jsonpath_expr.find(base_local)][0]
            jsonpath_expr.update(match, value)
    return base_local

JSONVariations(base, variationJSON, convetToBaseUnits=True)

The JSONVariations creates variations of the cartesian product of all the values between the variation groups. Parameters within the variation group change together. Hence, all the members of one variation group must an identical number of values.

Parameters:

Name Type Description Default
base base json to apply variations to
required
variationJSON list of variation groups
required
Source code in hera/utils/jsonutils.py
def JSONVariations(base,variationJSON,convetToBaseUnits=True):
    """The JSONVariations creates variations of the cartesian product of all the values between the variation groups.  Parameters within the variation group change together. Hence, all the members of one variation group must an identical number of values. 

    Parameters
    ----------
    base : base json to apply variations to
    variationJSON : list of variation groups

    Returns
    -------

    """
    variations = [[x for x in JSONvariationItem(dict(base),variation,convetToBaseUnits=convetToBaseUnits)] for variation in variationJSON]
    for var in product(*variations):
        params = {}
        for item in var:
            params.update(item)

        local_base = setJSONPath(base,params)
        yield local_base

Data Utilities

hera.utils.data.toolkit.dataToolkit

Bases: abstractToolkit

Toolkit for managing data repositories (replacing the old hera-data).

It is initialized only with the DEFAULT project.

The structure of a datasource file is:

{
    "<toolkit name>": {
        "<datasource name>": {
            "resource": "<location of datasource>",
            "dataFormat": "<type of data source>",
            "desc": {
                ... metadata ...
            }
        },
        ...
    },
    ...
}
Source code in hera/utils/data/toolkit.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
class dataToolkit(abstractToolkit):
    """
    Toolkit for managing data repositories (replacing the old hera-data).

    It is initialized only with the DEFAULT project.

    The structure of a datasource file is:

        {
            "<toolkit name>": {
                "<datasource name>": {
                    "resource": "<location of datasource>",
                    "dataFormat": "<type of data source>",
                    "desc": {
                        ... metadata ...
                    }
                },
                ...
            },
            ...
        }
    """

    def __init__(self, connectionName=None):
        """
        Initialize the dataToolkit on the default project.

        Parameters
        ----------
        connectionName : str, optional
            The DB connection name. If None, uses the current OS username.
        """
        super().__init__(toolkitName="heradata", projectName=self.DEFAULTPROJECT, filesDirectory=None, connectionName=connectionName)

    def addRepository(self, repositoryName, repositoryPath, overwrite=False):
        """
        Register a repository JSON file as a data source.

        Parameters
        ----------
        repositoryName : str
            The name to register the repository under.
        repositoryPath : str
            Path to the repository JSON file. ``.json`` extension is appended if missing.
        overwrite : bool
            If True, overwrite an existing repository with the same name.
        """
        self._allowWritingToDefaultProject = True  # allows the addition of datasource to the Default project.

        repositoryPath = f"{repositoryPath}.json" if "json" not in repositoryPath else repositoryPath
        self.addDataSource(dataSourceName=repositoryName, resource=os.path.abspath(repositoryPath),
                           dataFormat=self.datatypes.JSON_DICT, overwrite=overwrite)
        self._allowWritingToDefaultProject = False

    def getRepositoryTable(self):
        """
        Return a DataFrame listing all registered repositories.

        Returns
        -------
        pandas.DataFrame
        """
        return self.getDataSourceTable()

    def getRepository(self, repositoryName):
        """
        Load and return a repository's JSON content by name.

        Parameters
        ----------
        repositoryName : str
            The name of the registered repository.

        Returns
        -------
        dict
            The parsed repository JSON.
        """
        logger = get_classMethod_logger(self, "getRepository")
        logger.info(f"Trying to find repository {repositoryName} in project {self.DEFAULTPROJECT}")
        repo = self.getDataSourceData(datasourceName=repositoryName)

        return loadJSON(repo)

    def loadAllDatasourcesInAllRepositoriesToProject(self, projectName, overwrite=False):
        """
        Load all data sources from all registered repositories into a project.

        Parameters
        ----------
        projectName : str
            The target project name.
        overwrite : bool
            If True, overwrite existing data sources.
        """
        logger = get_classMethod_logger(self, "loadAllDatasourcesInAllRepositoriesToProject")
        for repository in self.getDataSourceList():
            try:
                logger.info(f"Loading the repository {repository} to project {projectName}")
                self.loadAllDatasourcesInRepositoryToProject(projectName, repositoryName=repository,
                                                             overwrite=overwrite)
            except ValueError as e:
                logger.info(
                    f"Did not loaded repository: {repository}, since an error occured when tried to load it.\n The error message: {e}")

    def loadAllDatasourcesInRepositoryToProject(self, projectName, repositoryName, overwrite=False):
        """
        Load all data sources from a specific repository into a project.

        Parameters
        ----------
        projectName : str
            The target project name.
        repositoryName : str
            The name of the registered repository to load from.
        overwrite : bool
            If True, overwrite existing data sources.
        """
        logger = get_classMethod_logger(self, "loadAllDatasourcesInRepositoryToProject")
        logger.info(f"Loading repository {repositoryName}")
        repdoc = self.getDataSourceDocument(repositoryName)
        conf = repdoc.getData()
        logger.info(f"Data: {conf}")
        basedir = os.path.dirname(repdoc.resource)
        logger.info(f"basedir: {basedir}")
        logger.info(f"Loading the items in {repositoryName} repository to the {projectName}")
        self.loadAllDatasourcesInRepositoryJSONToProject(projectName=projectName,
                                                         repositoryJSON=conf,
                                                         basedir=basedir,
                                                         overwrite=overwrite)

    # hera/utils/data/toolkit.py  (inside class dataToolkit)
    # -----------------------------------------------------------------------------
    # Load all datasources from a repository JSON into a project.
    # If a toolkit is missing, try to auto-register it using classpath hints.
    # -----------------------------------------------------------------------------
    def getToolkitDocument(self, toolkit_name: str):
        """
        Find a dynamic toolkit document by name (either desc.datasourceName or desc.toolkit).
        Returns the mongoengine document or None.
        """
        # First: direct filter on datasourceName (works on most implementations)
        try:
            q = self.getMeasurementsDocuments(
                type="ToolkitDataSource", datasourceName=toolkit_name
            )
            if q and len(q) > 0:
                return q[0]
        except Exception:
            # fall through to broader search below
            pass

        # Second: scan all ToolkitDataSource docs and match by desc fields
        try:
            q = self.getMeasurementsDocuments(type="ToolkitDataSource")
            for d in q:
                desc = d.desc or {}
                if desc.get("datasourceName") == toolkit_name or desc.get("toolkit") == toolkit_name:
                    return d
        except Exception:
            pass

        # Optional: also look in DataSource collection if your project uses it
        try:
            q = self.getDataSourceDocuments(datasourceName=toolkit_name)
            if q and len(q) > 0:
                return q[0]
        except Exception:
            pass

        return None


    def loadAllDatasourcesInRepositoryJSONToProject(self,
                                                    projectName: str,
                                                    repositoryJSON: dict,
                                                    basedir: str = "",
                                                    overwrite: bool = False,
                                                    auto_register_missing: bool = True):
        """
        Iterate through the repository JSON and for each toolkit:
        - Try to get an instance via ToolkitHome.getToolkit.
        - If missing and auto_register_missing=True, attempt auto-register ONLY if there is
          a clear classpath hint in the JSON (Registry.classpath or Registry.cls).
        - After we have a valid instance, dispatch to the appropriate handler per section.
        """
        logger = get_classMethod_logger(self, "loadAllDatasourcesInRepositoryJSONToProject")
        if isinstance(repositoryJSON, str):
            if  repositoryJSON.startswith('/'): # if there is no data
                logger.info("skipping dynamic toolkit")
                return
            try:
                repositoryJSON = json.loads(repositoryJSON)
            except json.JSONDecodeError:
                logger.error("repositoryJSON is a string but not a valid JSON format.")
                return
        if not isinstance(repositoryJSON, dict):
            logger.warning(f"Expected dict for repositoryJSON, got {type(repositoryJSON)}. Skipping.")
            return
        if not repositoryJSON:
            logger.info("repositoryJSON is empty. Nothing to load.")
            return
        handlerDict = dict(
            Config=self._handle_Config,
            Datasource=self._handle_DataSource,
            Measurements=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
                toolkit, itemName, docTypeDict, overwrite, "Measurements", basedir
            ),
            Simulations=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
                toolkit, itemName, docTypeDict, overwrite, "Simulations", basedir
            ),
            Cache=lambda toolkit, itemName, itemDesc, overwrite, basedir: self._DocumentHandler(
                toolkit, itemName, itemDesc, overwrite, "Cache", basedir
            ),
            Function=self._handle_Function,
        )

        tk_home = ToolkitHome(projectName=projectName)

        for toolkitName, toolkitDict in (repositoryJSON or {}).items():
            # 1) Try static/dynamic resolution via ToolkitHome.getToolkit
            try:
                toolkit = tk_home.getToolkit(toolkitName=toolkitName)

            except Exception as e:
                logger.info(f"Toolkit '{toolkitName}' not found via getToolkit: {e}")
                toolkit = None



            # 3) If we still do not have a toolkit instance, skip this key quietly
            if toolkit is None:
                logger.info(
                    f"Skipping key '{toolkitName}' in repository JSON – "
                    f"no matching toolkit and no auto-registration performed."
                )
                continue

            # 4) Dispatch sections (Config, Datasource, Measurements, Simulations, Cache, Function)
            for key, docTypeDict in toolkitDict.items():
                logger.info(f"Loading document type {key} to toolkit {toolkitName}")
                handler = handlerDict.get(key.title(), None)

                if handler is None:
                    err = (
                        f"Unkonw Handler {key.title()}. "
                        f"The handler must be {', '.join(handlerDict.keys())}. "
                    )
                    logger.error(err)
                    raise ValueError(err)

                try:
                    handler(
                        toolkit=toolkit,
                        itemName=key,
                        docTypeDict=docTypeDict,
                        overwrite=overwrite,
                        basedir=basedir,
                    )
                except Exception as e:
                    err = (
                        f"The error {e} occured while adding *{key}* to toolkit {toolkitName}... skipping!!!"
                    )
                    logger.error(err)


    def _handle_Config(self, toolkit, itemName, docTypeDict, overwrite, basedir):
        """
        Handle a Config section from a repository JSON by calling ``toolkit.setConfig``.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance to configure.
        itemName : str
            The section name (unused, always 'Config').
        docTypeDict : dict
            Key-value pairs to set as configuration.
        overwrite : bool
            Whether to overwrite existing values.
        basedir : str
            Base directory for resolving relative paths (unused for Config).
        """
        toolkit.setConfig(**docTypeDict)

    def _handle_DataSource(self, toolkit, itemName, docTypeDict, overwrite, basedir):
        """
        Handle a DataSource section from a repository JSON by adding data sources to the toolkit.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance to add data sources to.
        itemName : str
            The section name.
        docTypeDict : dict
            Dictionary mapping data source names to their descriptions.
        overwrite : bool
            If True, overwrite existing data sources.
        basedir : str
            Base directory for resolving relative resource paths.
        """
        logger = get_classMethod_logger(self, "_handle_DataSource")

        for itemName, itemDesc in docTypeDict.items():
            theItem = itemDesc["item"]

            isRelativePath = itemDesc.get("isRelativePath")
            assert (isRelativePath=='True' or isRelativePath=='False') or isinstance(isRelativePath,bool), "isRelativePath must be defined as 'True' or 'False'. "


            if 'resource' in theItem and "resourceFilePath" in theItem:
                logger.warning(f"both resource and resourceFilePath are defined for datasource {itemName}, using just resource")
                theItem.pop("resourceFilePath")

            if 'resource' not in theItem and "resourceFilePath" in theItem:
                if isRelativePath=='True' or isRelativePath is True:
                    logger.debug(
                        f"The input is not absolute (it is relative). Adding the path {basedir} to the resource {theItem['resourceFilePath']}")
                    theItem["resourceFilePath"] = os.path.join(basedir, theItem["resourceFilePath"])

                logger.info("detected dataSource resource specified using file's contents")
                try:
                    with open(theItem.pop("resourceFilePath")) as dataSourceResourceFile:
                        theItem['resource'] = json.load(dataSourceResourceFile)
                        logger.info("extracted resource from file successfully")
                except Exception as e:
                    logger.error(f"failed reading resource from file, {e}")
            else:
                # logger.debug(f"Checking if {itemName} resource is a path {isRelativePath}, is it absolute? {isAbsolute}")
                if isRelativePath=='True' or isRelativePath is True:
                    logger.debug(
                        f"The input is not absolute (it is relative). Adding the path {basedir} to the resource {theItem['resource']}")
                    theItem["resource"] = os.path.join(basedir, theItem["resource"])




            logger.debug(f"Checking if the data item {itemName} is already in project {toolkit.projectName}")
            datasource = toolkit.getDataSourceDocuments(datasourceName=itemName)
            if len(datasource) == 0 or overwrite:

                if len(datasource) == 1:
                    logger.debug("Remove the old datasource")
                    toolkit.deleteDataSource(datasourceName=itemName)

                logger.debug("Adding a new datasource")
                theItem['dataSourceName'] = itemName
                theItem['overwrite'] = overwrite
                toolkit.addDataSource(**theItem)
                logger.info(f"Added source {itemName} to tool {toolkit.toolkitName} in project {toolkit.projectName}")
            else:
                logger.error(f"Source {itemName} already exists in {toolkit.projectName}. Use --overwrite to force update")

    def _DocumentHandler(self, toolkit, itemName, docTypeDict, overwrite, documentType, basedir):
        """
        Handle a Measurements, Simulations, or Cache section from a repository JSON.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance to add documents to.
        itemName : str
            The section name.
        docTypeDict : dict
            Dictionary mapping document names to their descriptions.
        overwrite : bool
            If True, overwrite existing documents.
        documentType : str
            One of 'Measurements', 'Simulations', or 'Cache'.
        basedir : str
            Base directory for resolving relative resource paths.
        """
        logger = get_classMethod_logger(self, "_handle_Document")
        logger.info(f"Loading {itemName} to toolkit {toolkit.toolkitName} (ProjectName {toolkit.projectName}")
        for itemName, itemDesc in docTypeDict.items():
            theItem = itemDesc["item"]
            theItem["resource"] = self._makeItemPathAbsolute(theItem,basedir)

            logger.debug(f"Checking if the data item {itemName} is already in the project")
            retrieveFuncName = f"get{documentType}Documents"
            retrieveFunc = getattr(toolkit, retrieveFuncName)
            if retrieveFunc is None:
                raise ValueError(
                    f"function {retrieveFuncName} not found. Key {documentType} must be : DataSource, Measurement, Cache, or Simulation")
            qrydict = dict(theItem)
            del qrydict['resource']
            del qrydict['dataFormat']
            itemQry = dictToMongoQuery(qrydict)
            datasource = retrieveFunc(**itemQry)
            logger.debug(f"Found {len(datasource)} documents")

            if len(datasource) == 0:
                funcName = f"add{documentType}Document"

                logger.debug(f"Adding the document of type {documentType} using the function {funcName}")
                func = getattr(toolkit, funcName)

                func(**theItem)
                logger.info(f"Added source {itemName} to tool {toolkit.toolkitName} in project {toolkit.projectName}")

            elif overwrite:
                logger.debug("Updating an existing document")
                dataitem = datasource[0]
                dataitem['resource'] = theItem["resource"]
                dataitem['dataFormat'] = theItem['dataFormat']
                curDesc = theItem.get("desc", {})
                curDesc.update(dataitem['desc'])
                dataitem['desc'] = curDesc
                dataitem.save()
                logger.info(f"Updated source {itemName} in tool {toolkit.toolkitName} in project {toolkit.projectName}")
            else:
                logger.error(
                    f"Source {itemName} already exists in {toolkit.projectName}. Use --overwrite to force update")

    def _handle_Function(self, toolkit, itemName, docTypeDict, overwrite, basedir):
        """
        Handle a Function section by calling named methods on the toolkit.

        Each key in ``docTypeDict`` is a method name on ``self``. The value can be:
        - A dict: passed as keyword arguments to a single call.
        - A list of dicts: each dict triggers a separate call.

        The called method must accept an ``overwrite`` keyword argument.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance (unused directly; methods are called on ``self``).
        itemName : str
            The section name.
        docTypeDict : dict
            Maps method names to their argument(s).
        overwrite : bool
            Passed to each method call.
        basedir : str
            Base directory (unused for Function).
        """
        logger = get_classMethod_logger(self, "_handle_GeneralFunction")
        for itemName, itemDesc in docTypeDict.items():
            retrieveFunc = getattr(self,itemName)

            if isinstance(itemDesc,dict):
                retrieveFunc(**itemDesc,overwrite=overwrite)
            elif isinstance(itemDesc,list):
                for imt in itemDesc:
                    if isinstance(imt,dict):
                        retrieveFunc(**imt, overwrite=overwrite)
                    else:
                        err = f"{itemName} has a non dict item in the list : {imt}... ignoring."
                        logger.error(err)
            else:
                err = f"{itemName} value must be dict of a list of dicts. "
                logger.error(err)
                raise ValueError(err)


    def _makeItemPathAbsolute(self, theItem, basedir):
        """
        Convert a resource path to absolute if the ``isRelativePath`` flag is set.

        Parameters
        ----------
        theItem : dict
            The item data containing ``resource`` and optionally ``isRelativePath``.
        basedir : str
            Base directory to resolve relative paths against.

        Returns
        -------
        str
            The absolute resource path.
        """
        logger = get_classMethod_logger(self, "_makeItemPathAbsolute")
        isRelativePath = bool(theItem.get("isRelativePath", True))
        # logger.debug(f"Checking if {itemName} resource is a path {isRelativePath}, is it absolute? {isAbsolute}")

        if isRelativePath:
            logger.debug(
                f"The input is not absolute (it is relative). Adding the path {basedir} to the resource {theItem['resource']}")

        return os.path.join(basedir, theItem["resource"]) if isRelativePath else theItem["resource"]

    # -------------------------------------------------------------------------
    # Direct-load helpers (no MongoDB round-trip required)
    # -------------------------------------------------------------------------

    @staticmethod
    def resolveDataSourcePaths(repositoryJSON, basedir=""):
        """
        Walk a repository JSON dict and resolve every ``resource`` field to an
        absolute path, respecting the ``isRelativePath`` flag on each entry.

        Parameters
        ----------
        repositoryJSON : dict
            The parsed repository JSON (toolkit-name -> section dict).
        basedir : str
            The base directory against which relative paths are resolved.
            Typically the directory that contains the repository JSON file.

        Returns
        -------
        dict
            A *deep copy* of ``repositoryJSON`` with all ``resource`` fields
            converted to absolute paths.
        """
        import copy
        resolved = copy.deepcopy(repositoryJSON)

        for _toolkitName, toolkitDict in resolved.items():
            if not isinstance(toolkitDict, dict):
                continue
            for sectionKey, sectionDict in toolkitDict.items():
                if not isinstance(sectionDict, dict):
                    continue
                for itemName, itemDesc in sectionDict.items():
                    if not isinstance(itemDesc, dict):
                        continue
                    # Handle entries that have an "item" wrapper
                    item = itemDesc.get("item", itemDesc)
                    if "resource" not in item:
                        continue
                    is_rel = itemDesc.get("isRelativePath", item.get("isRelativePath"))
                    if is_rel == "True" or is_rel is True:
                        item["resource"] = os.path.abspath(
                            os.path.join(basedir, item["resource"])
                        )
        return resolved

    @staticmethod
    def loadRepositoryFromPath(json_path):
        """
        Read a repository JSON file directly from disk, resolve all relative
        ``resource`` paths to absolute paths based on the JSON file's directory,
        and return the resulting dict.

        This allows tests (and lightweight scripts) to work with repository
        data without going through ``addRepository`` + MongoDB storage.

        Parameters
        ----------
        json_path : str
            Path to the repository JSON file.

        Returns
        -------
        dict
            The repository dict with all resource paths resolved to absolute.

        Raises
        ------
        FileNotFoundError
            If *json_path* does not exist.
        """
        json_path = os.path.abspath(json_path)
        if not os.path.isfile(json_path):
            raise FileNotFoundError(f"Repository JSON not found: {json_path}")

        with open(json_path, "r", encoding="utf-8") as fh:
            repo_json = json.load(fh)

        basedir = os.path.dirname(json_path)
        return dataToolkit.resolveDataSourcePaths(repo_json, basedir=basedir)

__init__(connectionName=None)

Initialize the dataToolkit on the default project.

Parameters:

Name Type Description Default
connectionName str

The DB connection name. If None, uses the current OS username.

None
Source code in hera/utils/data/toolkit.py
def __init__(self, connectionName=None):
    """
    Initialize the dataToolkit on the default project.

    Parameters
    ----------
    connectionName : str, optional
        The DB connection name. If None, uses the current OS username.
    """
    super().__init__(toolkitName="heradata", projectName=self.DEFAULTPROJECT, filesDirectory=None, connectionName=connectionName)

addRepository(repositoryName, repositoryPath, overwrite=False)

Register a repository JSON file as a data source.

Parameters:

Name Type Description Default
repositoryName str

The name to register the repository under.

required
repositoryPath str

Path to the repository JSON file. .json extension is appended if missing.

required
overwrite bool

If True, overwrite an existing repository with the same name.

False
Source code in hera/utils/data/toolkit.py
def addRepository(self, repositoryName, repositoryPath, overwrite=False):
    """
    Register a repository JSON file as a data source.

    Parameters
    ----------
    repositoryName : str
        The name to register the repository under.
    repositoryPath : str
        Path to the repository JSON file. ``.json`` extension is appended if missing.
    overwrite : bool
        If True, overwrite an existing repository with the same name.
    """
    self._allowWritingToDefaultProject = True  # allows the addition of datasource to the Default project.

    repositoryPath = f"{repositoryPath}.json" if "json" not in repositoryPath else repositoryPath
    self.addDataSource(dataSourceName=repositoryName, resource=os.path.abspath(repositoryPath),
                       dataFormat=self.datatypes.JSON_DICT, overwrite=overwrite)
    self._allowWritingToDefaultProject = False

getRepositoryTable()

Return a DataFrame listing all registered repositories.

Returns:

Type Description
DataFrame
Source code in hera/utils/data/toolkit.py
def getRepositoryTable(self):
    """
    Return a DataFrame listing all registered repositories.

    Returns
    -------
    pandas.DataFrame
    """
    return self.getDataSourceTable()

getRepository(repositoryName)

Load and return a repository's JSON content by name.

Parameters:

Name Type Description Default
repositoryName str

The name of the registered repository.

required

Returns:

Type Description
dict

The parsed repository JSON.

Source code in hera/utils/data/toolkit.py
def getRepository(self, repositoryName):
    """
    Load and return a repository's JSON content by name.

    Parameters
    ----------
    repositoryName : str
        The name of the registered repository.

    Returns
    -------
    dict
        The parsed repository JSON.
    """
    logger = get_classMethod_logger(self, "getRepository")
    logger.info(f"Trying to find repository {repositoryName} in project {self.DEFAULTPROJECT}")
    repo = self.getDataSourceData(datasourceName=repositoryName)

    return loadJSON(repo)

loadAllDatasourcesInAllRepositoriesToProject(projectName, overwrite=False)

Load all data sources from all registered repositories into a project.

Parameters:

Name Type Description Default
projectName str

The target project name.

required
overwrite bool

If True, overwrite existing data sources.

False
Source code in hera/utils/data/toolkit.py
def loadAllDatasourcesInAllRepositoriesToProject(self, projectName, overwrite=False):
    """
    Load all data sources from all registered repositories into a project.

    Parameters
    ----------
    projectName : str
        The target project name.
    overwrite : bool
        If True, overwrite existing data sources.
    """
    logger = get_classMethod_logger(self, "loadAllDatasourcesInAllRepositoriesToProject")
    for repository in self.getDataSourceList():
        try:
            logger.info(f"Loading the repository {repository} to project {projectName}")
            self.loadAllDatasourcesInRepositoryToProject(projectName, repositoryName=repository,
                                                         overwrite=overwrite)
        except ValueError as e:
            logger.info(
                f"Did not loaded repository: {repository}, since an error occured when tried to load it.\n The error message: {e}")

loadAllDatasourcesInRepositoryToProject(projectName, repositoryName, overwrite=False)

Load all data sources from a specific repository into a project.

Parameters:

Name Type Description Default
projectName str

The target project name.

required
repositoryName str

The name of the registered repository to load from.

required
overwrite bool

If True, overwrite existing data sources.

False
Source code in hera/utils/data/toolkit.py
def loadAllDatasourcesInRepositoryToProject(self, projectName, repositoryName, overwrite=False):
    """
    Load all data sources from a specific repository into a project.

    Parameters
    ----------
    projectName : str
        The target project name.
    repositoryName : str
        The name of the registered repository to load from.
    overwrite : bool
        If True, overwrite existing data sources.
    """
    logger = get_classMethod_logger(self, "loadAllDatasourcesInRepositoryToProject")
    logger.info(f"Loading repository {repositoryName}")
    repdoc = self.getDataSourceDocument(repositoryName)
    conf = repdoc.getData()
    logger.info(f"Data: {conf}")
    basedir = os.path.dirname(repdoc.resource)
    logger.info(f"basedir: {basedir}")
    logger.info(f"Loading the items in {repositoryName} repository to the {projectName}")
    self.loadAllDatasourcesInRepositoryJSONToProject(projectName=projectName,
                                                     repositoryJSON=conf,
                                                     basedir=basedir,
                                                     overwrite=overwrite)

getToolkitDocument(toolkit_name: str)

Find a dynamic toolkit document by name (either desc.datasourceName or desc.toolkit). Returns the mongoengine document or None.

Source code in hera/utils/data/toolkit.py
def getToolkitDocument(self, toolkit_name: str):
    """
    Find a dynamic toolkit document by name (either desc.datasourceName or desc.toolkit).
    Returns the mongoengine document or None.
    """
    # First: direct filter on datasourceName (works on most implementations)
    try:
        q = self.getMeasurementsDocuments(
            type="ToolkitDataSource", datasourceName=toolkit_name
        )
        if q and len(q) > 0:
            return q[0]
    except Exception:
        # fall through to broader search below
        pass

    # Second: scan all ToolkitDataSource docs and match by desc fields
    try:
        q = self.getMeasurementsDocuments(type="ToolkitDataSource")
        for d in q:
            desc = d.desc or {}
            if desc.get("datasourceName") == toolkit_name or desc.get("toolkit") == toolkit_name:
                return d
    except Exception:
        pass

    # Optional: also look in DataSource collection if your project uses it
    try:
        q = self.getDataSourceDocuments(datasourceName=toolkit_name)
        if q and len(q) > 0:
            return q[0]
    except Exception:
        pass

    return None

loadAllDatasourcesInRepositoryJSONToProject(projectName: str, repositoryJSON: dict, basedir: str = '', overwrite: bool = False, auto_register_missing: bool = True)

Iterate through the repository JSON and for each toolkit: - Try to get an instance via ToolkitHome.getToolkit. - If missing and auto_register_missing=True, attempt auto-register ONLY if there is a clear classpath hint in the JSON (Registry.classpath or Registry.cls). - After we have a valid instance, dispatch to the appropriate handler per section.

Source code in hera/utils/data/toolkit.py
def loadAllDatasourcesInRepositoryJSONToProject(self,
                                                projectName: str,
                                                repositoryJSON: dict,
                                                basedir: str = "",
                                                overwrite: bool = False,
                                                auto_register_missing: bool = True):
    """
    Iterate through the repository JSON and for each toolkit:
    - Try to get an instance via ToolkitHome.getToolkit.
    - If missing and auto_register_missing=True, attempt auto-register ONLY if there is
      a clear classpath hint in the JSON (Registry.classpath or Registry.cls).
    - After we have a valid instance, dispatch to the appropriate handler per section.
    """
    logger = get_classMethod_logger(self, "loadAllDatasourcesInRepositoryJSONToProject")
    if isinstance(repositoryJSON, str):
        if  repositoryJSON.startswith('/'): # if there is no data
            logger.info("skipping dynamic toolkit")
            return
        try:
            repositoryJSON = json.loads(repositoryJSON)
        except json.JSONDecodeError:
            logger.error("repositoryJSON is a string but not a valid JSON format.")
            return
    if not isinstance(repositoryJSON, dict):
        logger.warning(f"Expected dict for repositoryJSON, got {type(repositoryJSON)}. Skipping.")
        return
    if not repositoryJSON:
        logger.info("repositoryJSON is empty. Nothing to load.")
        return
    handlerDict = dict(
        Config=self._handle_Config,
        Datasource=self._handle_DataSource,
        Measurements=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
            toolkit, itemName, docTypeDict, overwrite, "Measurements", basedir
        ),
        Simulations=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
            toolkit, itemName, docTypeDict, overwrite, "Simulations", basedir
        ),
        Cache=lambda toolkit, itemName, itemDesc, overwrite, basedir: self._DocumentHandler(
            toolkit, itemName, itemDesc, overwrite, "Cache", basedir
        ),
        Function=self._handle_Function,
    )

    tk_home = ToolkitHome(projectName=projectName)

    for toolkitName, toolkitDict in (repositoryJSON or {}).items():
        # 1) Try static/dynamic resolution via ToolkitHome.getToolkit
        try:
            toolkit = tk_home.getToolkit(toolkitName=toolkitName)

        except Exception as e:
            logger.info(f"Toolkit '{toolkitName}' not found via getToolkit: {e}")
            toolkit = None



        # 3) If we still do not have a toolkit instance, skip this key quietly
        if toolkit is None:
            logger.info(
                f"Skipping key '{toolkitName}' in repository JSON – "
                f"no matching toolkit and no auto-registration performed."
            )
            continue

        # 4) Dispatch sections (Config, Datasource, Measurements, Simulations, Cache, Function)
        for key, docTypeDict in toolkitDict.items():
            logger.info(f"Loading document type {key} to toolkit {toolkitName}")
            handler = handlerDict.get(key.title(), None)

            if handler is None:
                err = (
                    f"Unkonw Handler {key.title()}. "
                    f"The handler must be {', '.join(handlerDict.keys())}. "
                )
                logger.error(err)
                raise ValueError(err)

            try:
                handler(
                    toolkit=toolkit,
                    itemName=key,
                    docTypeDict=docTypeDict,
                    overwrite=overwrite,
                    basedir=basedir,
                )
            except Exception as e:
                err = (
                    f"The error {e} occured while adding *{key}* to toolkit {toolkitName}... skipping!!!"
                )
                logger.error(err)

resolveDataSourcePaths(repositoryJSON, basedir='') staticmethod

Walk a repository JSON dict and resolve every resource field to an absolute path, respecting the isRelativePath flag on each entry.

Parameters:

Name Type Description Default
repositoryJSON dict

The parsed repository JSON (toolkit-name -> section dict).

required
basedir str

The base directory against which relative paths are resolved. Typically the directory that contains the repository JSON file.

''

Returns:

Type Description
dict

A deep copy of repositoryJSON with all resource fields converted to absolute paths.

Source code in hera/utils/data/toolkit.py
@staticmethod
def resolveDataSourcePaths(repositoryJSON, basedir=""):
    """
    Walk a repository JSON dict and resolve every ``resource`` field to an
    absolute path, respecting the ``isRelativePath`` flag on each entry.

    Parameters
    ----------
    repositoryJSON : dict
        The parsed repository JSON (toolkit-name -> section dict).
    basedir : str
        The base directory against which relative paths are resolved.
        Typically the directory that contains the repository JSON file.

    Returns
    -------
    dict
        A *deep copy* of ``repositoryJSON`` with all ``resource`` fields
        converted to absolute paths.
    """
    import copy
    resolved = copy.deepcopy(repositoryJSON)

    for _toolkitName, toolkitDict in resolved.items():
        if not isinstance(toolkitDict, dict):
            continue
        for sectionKey, sectionDict in toolkitDict.items():
            if not isinstance(sectionDict, dict):
                continue
            for itemName, itemDesc in sectionDict.items():
                if not isinstance(itemDesc, dict):
                    continue
                # Handle entries that have an "item" wrapper
                item = itemDesc.get("item", itemDesc)
                if "resource" not in item:
                    continue
                is_rel = itemDesc.get("isRelativePath", item.get("isRelativePath"))
                if is_rel == "True" or is_rel is True:
                    item["resource"] = os.path.abspath(
                        os.path.join(basedir, item["resource"])
                    )
    return resolved

loadRepositoryFromPath(json_path) staticmethod

Read a repository JSON file directly from disk, resolve all relative resource paths to absolute paths based on the JSON file's directory, and return the resulting dict.

This allows tests (and lightweight scripts) to work with repository data without going through addRepository + MongoDB storage.

Parameters:

Name Type Description Default
json_path str

Path to the repository JSON file.

required

Returns:

Type Description
dict

The repository dict with all resource paths resolved to absolute.

Raises:

Type Description
FileNotFoundError

If json_path does not exist.

Source code in hera/utils/data/toolkit.py
@staticmethod
def loadRepositoryFromPath(json_path):
    """
    Read a repository JSON file directly from disk, resolve all relative
    ``resource`` paths to absolute paths based on the JSON file's directory,
    and return the resulting dict.

    This allows tests (and lightweight scripts) to work with repository
    data without going through ``addRepository`` + MongoDB storage.

    Parameters
    ----------
    json_path : str
        Path to the repository JSON file.

    Returns
    -------
    dict
        The repository dict with all resource paths resolved to absolute.

    Raises
    ------
    FileNotFoundError
        If *json_path* does not exist.
    """
    json_path = os.path.abspath(json_path)
    if not os.path.isfile(json_path):
        raise FileNotFoundError(f"Repository JSON not found: {json_path}")

    with open(json_path, "r", encoding="utf-8") as fh:
        repo_json = json.load(fh)

    basedir = os.path.dirname(json_path)
    return dataToolkit.resolveDataSourcePaths(repo_json, basedir=basedir)