Skip to content

Data Layer API

MongoDB-backed data storage: collections, documents, data handlers, and function caching.

Collections

AbstractCollection

hera.datalayer.collection.AbstractCollection

Bases: object

Abstract collection that contains documents of a certain type

Source code in hera/datalayer/collection.py
class AbstractCollection(object):
    """
        Abstract collection that contains documents of a certain type
    """

    _metadataCol = None
    _type = None

    @property
    def type(self):
        """
        The collection type (e.g. 'Measurements', 'Simulations', 'Cache'), or None for all.

        Returns
        -------
        str or None
        """
        return self._type

    def __init__(self, ctype=None, connectionName=None):
        """
        Parameters
        ----------
        ctype : str or None
            Collection type name (e.g. 'Measurements'). None for all types.
        connectionName : str or None
            Optional database connection alias.
        """
        self._type = ctype
        self._metadataCol = getDBObject('Metadata', connectionName) if self.type is None else getDBObject(ctype, connectionName)

    def getDocumentsAsDict(self, projectName, with_id=False, **query):
        """
        Returns a dict with a 'documents' key and list of documents in a dict formats as value.
        The list of the documents are the result of your query.

        Parameters
        ----------
        projectName : str
            The projectName.

        with_id : bool, optional, default False
            rather or not should the 'id' key be in the documents.

        query :
            query arguments.

        Returns
        -------
        dict
            A dict with 'documents' key and the value is a list of dicts that represent the documents that fulfills the query.
        """
        dictList = [doc.asDict(with_id=with_id) for doc in self.getDocuments(projectName=projectName, **query)]
        return dict(documents=dictList)

    def getDocuments(self, projectName, resource=None, dataFormat=None, type=None, **desc):
        """
        Get the documents that satisfy the given query.
        If projectName is None search over all projects.

        Parameters
        ----------
        projectName : str
            The project name.

        resource :
            The data resource.

        dataFormat : str
            The data format.
        type : str
            The type which the data belongs to.
        desc :
            Other metadata arguments.

        Returns
        -------
        list
            List of documents that fulfill the query.
        """
        query = {}
        if resource is not None:
            query['resource'] = resource
        if dataFormat is not None:
            query['dataFormat'] = dataFormat
        if type is not None:
            query['type'] = type
        if projectName is not None:
            query['projectName'] = projectName

        descAsJSON = ConfigurationToJSON(desc,standardize=True,splitUnits=True,keepOriginalUnits=False)
        query.update(dictToMongoQuery(descAsJSON, prefix="desc"))

        return self._metadataCol.objects(**query)

    def _getAllValueByKey(self, key, **query):
        """Return all unique values for ``key`` across matching documents."""
        return list(set([doc[key] for doc in self.getDocuments(projectName=None, **query)]))

    def getProjectList(self):
        """
        Returns the list of unique project names in this collection.

        Returns
        -------
        list of str
        """
        return self._getAllValueByKey(key='projectName')

    def getDocumentByID(self, id):
        """
        Returns a document by its ID.

        Parameters
        ----------
        id : str
            The document ID.

        Returns
        -------
        document
            The document with the relevant ID.
        """
        return self._metadataCol.objects.get(id=id)

    def addDocument(self,projectName,resource="",dataFormat="string",type="",desc={}):
        """
            Adds a document to the database.

        Parameters
        ----------
        projectName : str
            The project to add the document

        resource :
            The data of the document.

        dataFormat : str
            The type of the dataformat.
            See datahandler for the available types.

        desc : dict
            Holds any additional fields that describe the

        type : str
            The type of the data

        Returns
        -------
        mongoengine document
        """
        try:
            obj = self._metadataCol(projectName=projectName,resource=resource,dataFormat=dataFormat,type=type,desc=desc).save()
        except ValidationError as e:
            raise ValidationError("Not all of the required fields are delivered "
                                  "or one of the fields type is not proper. %s " % str(e))
        return obj

    def addDocumentFromJSON(self, json_data):
        """
        Adds a document from a JSON string representation.

        Parameters
        ----------
        json_data : str
            A JSON string representing the document.
        """
        self._metadataCol.from_json(json_data).save()

    def deleteDocuments(self, projectName, **query):
        """
        Deletes documents that satisfy the given query.

        Parameters
        ----------
        projectName : str
            The project name.

        query :
            Other query arguments.

        Returns
        -------
        list

        dictionary with the data that was removed.

        """
        deletedDocs = []
        for doc in self.getDocuments(projectName=projectName, **query):
            deletedDocs.append(doc.asDict(with_id=True))
            doc.delete()

        return deletedDocs

    def deleteDocumentByID(self, id):
        """
        Deletes a documents by its ID.

        Parameters
        ----------
        id : str
            The document ID.

        Returns
        -------
        dict.

        The record that was deleted.

        """
        doc = self.getDocumentByID(id=id)
        deletedDoc = doc.asDict(with_id=True)
        doc.delete()

        return deletedDoc

type property

The collection type (e.g. 'Measurements', 'Simulations', 'Cache'), or None for all.

Returns:

Type Description
str or None

__init__(ctype=None, connectionName=None)

Parameters:

Name Type Description Default
ctype str or None

Collection type name (e.g. 'Measurements'). None for all types.

None
connectionName str or None

Optional database connection alias.

None
Source code in hera/datalayer/collection.py
def __init__(self, ctype=None, connectionName=None):
    """
    Parameters
    ----------
    ctype : str or None
        Collection type name (e.g. 'Measurements'). None for all types.
    connectionName : str or None
        Optional database connection alias.
    """
    self._type = ctype
    self._metadataCol = getDBObject('Metadata', connectionName) if self.type is None else getDBObject(ctype, connectionName)

getDocumentsAsDict(projectName, with_id=False, **query)

Returns a dict with a 'documents' key and list of documents in a dict formats as value. The list of the documents are the result of your query.

Parameters:

Name Type Description Default
projectName str

The projectName.

required
with_id bool

rather or not should the 'id' key be in the documents.

False
query

query arguments.

{}

Returns:

Type Description
dict

A dict with 'documents' key and the value is a list of dicts that represent the documents that fulfills the query.

Source code in hera/datalayer/collection.py
def getDocumentsAsDict(self, projectName, with_id=False, **query):
    """
    Returns a dict with a 'documents' key and list of documents in a dict formats as value.
    The list of the documents are the result of your query.

    Parameters
    ----------
    projectName : str
        The projectName.

    with_id : bool, optional, default False
        rather or not should the 'id' key be in the documents.

    query :
        query arguments.

    Returns
    -------
    dict
        A dict with 'documents' key and the value is a list of dicts that represent the documents that fulfills the query.
    """
    dictList = [doc.asDict(with_id=with_id) for doc in self.getDocuments(projectName=projectName, **query)]
    return dict(documents=dictList)

getDocuments(projectName, resource=None, dataFormat=None, type=None, **desc)

Get the documents that satisfy the given query. If projectName is None search over all projects.

Parameters:

Name Type Description Default
projectName str

The project name.

required
resource

The data resource.

None
dataFormat str

The data format.

None
type str

The type which the data belongs to.

None
desc

Other metadata arguments.

{}

Returns:

Type Description
list

List of documents that fulfill the query.

Source code in hera/datalayer/collection.py
def getDocuments(self, projectName, resource=None, dataFormat=None, type=None, **desc):
    """
    Get the documents that satisfy the given query.
    If projectName is None search over all projects.

    Parameters
    ----------
    projectName : str
        The project name.

    resource :
        The data resource.

    dataFormat : str
        The data format.
    type : str
        The type which the data belongs to.
    desc :
        Other metadata arguments.

    Returns
    -------
    list
        List of documents that fulfill the query.
    """
    query = {}
    if resource is not None:
        query['resource'] = resource
    if dataFormat is not None:
        query['dataFormat'] = dataFormat
    if type is not None:
        query['type'] = type
    if projectName is not None:
        query['projectName'] = projectName

    descAsJSON = ConfigurationToJSON(desc,standardize=True,splitUnits=True,keepOriginalUnits=False)
    query.update(dictToMongoQuery(descAsJSON, prefix="desc"))

    return self._metadataCol.objects(**query)

getProjectList()

Returns the list of unique project names in this collection.

Returns:

Type Description
list of str
Source code in hera/datalayer/collection.py
def getProjectList(self):
    """
    Returns the list of unique project names in this collection.

    Returns
    -------
    list of str
    """
    return self._getAllValueByKey(key='projectName')

getDocumentByID(id)

Returns a document by its ID.

Parameters:

Name Type Description Default
id str

The document ID.

required

Returns:

Type Description
document

The document with the relevant ID.

Source code in hera/datalayer/collection.py
def getDocumentByID(self, id):
    """
    Returns a document by its ID.

    Parameters
    ----------
    id : str
        The document ID.

    Returns
    -------
    document
        The document with the relevant ID.
    """
    return self._metadataCol.objects.get(id=id)

addDocument(projectName, resource='', dataFormat='string', type='', desc={})

Adds a document to the database.

Parameters:

Name Type Description Default
projectName str

The project to add the document

required
resource

The data of the document.

''
dataFormat str

The type of the dataformat. See datahandler for the available types.

'string'
desc dict

Holds any additional fields that describe the

{}
type str

The type of the data

''

Returns:

Type Description
mongoengine document
Source code in hera/datalayer/collection.py
def addDocument(self,projectName,resource="",dataFormat="string",type="",desc={}):
    """
        Adds a document to the database.

    Parameters
    ----------
    projectName : str
        The project to add the document

    resource :
        The data of the document.

    dataFormat : str
        The type of the dataformat.
        See datahandler for the available types.

    desc : dict
        Holds any additional fields that describe the

    type : str
        The type of the data

    Returns
    -------
    mongoengine document
    """
    try:
        obj = self._metadataCol(projectName=projectName,resource=resource,dataFormat=dataFormat,type=type,desc=desc).save()
    except ValidationError as e:
        raise ValidationError("Not all of the required fields are delivered "
                              "or one of the fields type is not proper. %s " % str(e))
    return obj

addDocumentFromJSON(json_data)

Adds a document from a JSON string representation.

Parameters:

Name Type Description Default
json_data str

A JSON string representing the document.

required
Source code in hera/datalayer/collection.py
def addDocumentFromJSON(self, json_data):
    """
    Adds a document from a JSON string representation.

    Parameters
    ----------
    json_data : str
        A JSON string representing the document.
    """
    self._metadataCol.from_json(json_data).save()

deleteDocuments(projectName, **query)

Deletes documents that satisfy the given query.

Parameters:

Name Type Description Default
projectName str

The project name.

required
query

Other query arguments.

{}

Returns:

Type Description
list
dictionary with the data that was removed.
Source code in hera/datalayer/collection.py
def deleteDocuments(self, projectName, **query):
    """
    Deletes documents that satisfy the given query.

    Parameters
    ----------
    projectName : str
        The project name.

    query :
        Other query arguments.

    Returns
    -------
    list

    dictionary with the data that was removed.

    """
    deletedDocs = []
    for doc in self.getDocuments(projectName=projectName, **query):
        deletedDocs.append(doc.asDict(with_id=True))
        doc.delete()

    return deletedDocs

deleteDocumentByID(id)

Deletes a documents by its ID.

Parameters:

Name Type Description Default
id str

The document ID.

required

Returns:

Type Description
dict.
The record that was deleted.
Source code in hera/datalayer/collection.py
def deleteDocumentByID(self, id):
    """
    Deletes a documents by its ID.

    Parameters
    ----------
    id : str
        The document ID.

    Returns
    -------
    dict.

    The record that was deleted.

    """
    doc = self.getDocumentByID(id=id)
    deletedDoc = doc.asDict(with_id=True)
    doc.delete()

    return deletedDoc

Measurements_Collection

hera.datalayer.collection.Measurements_Collection

Bases: AbstractCollection

Collection that contains measurement documents.

Source code in hera/datalayer/collection.py
class Measurements_Collection(AbstractCollection):
    """
    Collection that contains measurement documents.
    """

    def __init__(self, connectionName=None):
        """
        Parameters
        ----------
        connectionName : str or None
            Optional database connection alias.
        """
        if version == 2:
            super(Measurements_Collection, self).__init__(ctype='Measurements', connectionName=connectionName)
        elif version == 3:
            super().__init__(ctype='Measurements', connectionName=connectionName)

__init__(connectionName=None)

Parameters:

Name Type Description Default
connectionName str or None

Optional database connection alias.

None
Source code in hera/datalayer/collection.py
def __init__(self, connectionName=None):
    """
    Parameters
    ----------
    connectionName : str or None
        Optional database connection alias.
    """
    if version == 2:
        super(Measurements_Collection, self).__init__(ctype='Measurements', connectionName=connectionName)
    elif version == 3:
        super().__init__(ctype='Measurements', connectionName=connectionName)

Simulations_Collection

hera.datalayer.collection.Simulations_Collection

Bases: AbstractCollection

Abstract collection that contains documents of Simulations

Source code in hera/datalayer/collection.py
class Simulations_Collection(AbstractCollection):
    """
        Abstract collection that contains documents of Simulations
    """

    def __init__(self, connectionName=None):
        """
        Parameters
        ----------
        connectionName : str or None
            Optional database connection alias.
        """
        if version == 2:
            super(Simulations_Collection, self).__init__(ctype='Simulations', connectionName=connectionName)
        elif version == 3:
            super().__init__(ctype='Simulations', connectionName=connectionName)

__init__(connectionName=None)

Parameters:

Name Type Description Default
connectionName str or None

Optional database connection alias.

None
Source code in hera/datalayer/collection.py
def __init__(self, connectionName=None):
    """
    Parameters
    ----------
    connectionName : str or None
        Optional database connection alias.
    """
    if version == 2:
        super(Simulations_Collection, self).__init__(ctype='Simulations', connectionName=connectionName)
    elif version == 3:
        super().__init__(ctype='Simulations', connectionName=connectionName)

Cache_Collection

hera.datalayer.collection.Cache_Collection

Bases: AbstractCollection

Abstract collection that contains documents of Cache

Source code in hera/datalayer/collection.py
class Cache_Collection(AbstractCollection):
    """
        Abstract collection that contains documents of Cache
    """

    def __init__(self, connectionName=None):
        """
        Parameters
        ----------
        connectionName : str or None
            Optional database connection alias.
        """
        if version == 2:
            super(Cache_Collection, self).__init__(ctype='Cache', connectionName=connectionName)
        elif version == 3:
            super().__init__(ctype='Cache', connectionName=connectionName)

__init__(connectionName=None)

Parameters:

Name Type Description Default
connectionName str or None

Optional database connection alias.

None
Source code in hera/datalayer/collection.py
def __init__(self, connectionName=None):
    """
    Parameters
    ----------
    connectionName : str or None
        Optional database connection alias.
    """
    if version == 2:
        super(Cache_Collection, self).__init__(ctype='Cache', connectionName=connectionName)
    elif version == 3:
        super().__init__(ctype='Cache', connectionName=connectionName)

Data Handlers

datatypes

hera.datalayer.datahandler.datatypes

Registry of supported data format constants and dispatch logic for data handlers.

Each constant (e.g. STRING, PARQUET, HDF) identifies a data format. Use getHandler(formatName) to retrieve the corresponding DataHandler_* class, or getDataFormatName(obj) to auto-detect the format from a Python object.

Source code in hera/datalayer/datahandler.py
class datatypes:
    """
    Registry of supported data format constants and dispatch logic for data handlers.

    Each constant (e.g. ``STRING``, ``PARQUET``, ``HDF``) identifies a data format.
    Use ``getHandler(formatName)`` to retrieve the corresponding ``DataHandler_*`` class,
    or ``getDataFormatName(obj)`` to auto-detect the format from a Python object.
    """
    STRING = "string"
    TIME = "time"
    CSV_PANDAS = "csv_pandas"
    HDF = "HDF"
    NETCDF_XARRAY = "netcdf_xarray"
    ZARR_XARRAY = "zarr_xarray"
    JSON_DICT = "JSON_dict"
    JSON_PANDAS = "JSON_pandas"
    JSON_GEOPANDAS = "JSON_geopandas"
    GEOPANDAS = "geopandas"
    GEOTIFF = "geotiff"
    PARQUET = "parquet"
    IMAGE = "image"
    PICKLE = "pickle"
    DICT = "dict"
    NUMPY_ARRAY = "numpy_array"
    NUMPY_DICT_ARRAY = "numpy_dict_array"  # A dict of numpy arrays, no automatic detection.
    CLASS = "Class"

    @staticmethod
    def get_obj_or_instance_fullName(obj):
        """
        Returns the fully qualified name of a class or instance, including its module.

        Examples:
            >>> get_full_name(SomeClass)
            'package.module.SomeClass'

            >>> get_full_name(SomeClass())
            'package.module.SomeClass'
        """
        # If it's a class
        if isinstance(obj, type):
            cls = obj
        else:
            cls = obj.__class__

        module = cls.__module__
        qualname = cls.__qualname__

        if module == "builtins":
            return qualname  # No need to show 'builtins' for int, str, etc.
        return f"{module}.{qualname}"

    typeDatatypeMap = {
        "str": dict(typeName=STRING, ext="txt"),
        "pandas.core.frame.DataFrame": dict(typeName=PARQUET, ext="parquet"),
        'pandas.core.series.Series': dict(typeName=JSON_PANDAS, ext="json"),
        "dask_expr._collection.DataFrame": dict(typeName=PARQUET, ext="parquet"),
        'geopandas.geodataframe.GeoDataFrame': dict(typeName=GEOPANDAS, ext="gpkg"),
        'xarray.core.dataarray.DataArray': dict(typeName=ZARR_XARRAY, ext="zarr"),
        "dict": dict(typeName=PICKLE, ext="pckle"),
        "list": dict(typeName=PICKLE, ext="pckle"),
        "bytes": dict(typeName=PICKLE, ext="pckle"),
        "object": dict(typeName=PICKLE, ext="pckle"),
        "numpy.ndarray": dict(typeName=NUMPY_ARRAY, ext="npy")
    }

    @staticmethod
    def getDataFormatName(obj_or_class):
        """
            Tries to find the datatype name in hera for the object.
            if cannot found, use general object.

        Parameters
        ----------
        obj_or_class : object or type.

        Returns
        -------
            A dict with
                - typeName : the string that identifies the datahandler.
                -ext : the extension of the file name.
        """
        objTypeName = datatypes.get_obj_or_instance_fullName(obj_or_class)


        dataItemName = datatypes.typeDatatypeMap["object"] if objTypeName not in datatypes.typeDatatypeMap else \
        datatypes.typeDatatypeMap[objTypeName]

        return dataItemName["typeName"]

    @staticmethod
    def getDataFormatExtension(obj_or_class):
        """
            Tries to find the datatype name in hera for the object.
            if cannot found, use general object.

        Parameters
        ----------
        obj_or_class : object or type.

        Returns
        -------
            A dict with
                - typeName : the string that identifies the datahandler.
                -ext : the extension of the file name.
        """
        objTypeName = datatypes.get_obj_or_instance_fullName(obj_or_class)


        dataItemName = datatypes.typeDatatypeMap["object"] if objTypeName not in datatypes.typeDatatypeMap else \
        datatypes.typeDatatypeMap[objTypeName]

        return dataItemName["ext"]

    @staticmethod
    def guessHandler(obj_or_class):
        """
        Auto-detect the data format and return the appropriate handler class.

        Parameters
        ----------
        obj_or_class : object or type
            The data object or class to detect the format for.

        Returns
        -------
        DataHandler class
            The handler class for the detected format.
        """
        dataTypeName = datatypes.getDataFormatName(obj_or_class)

        return datatypes.getHandler(objectType=dataTypeName)

    @staticmethod
    def getHandler(objectType):
        """
        Return the DataHandler class for the given data format name.

        Parameters
        ----------
        objectType : str
            A data format name (e.g. ``datatypes.PARQUET``).

        Returns
        -------
        DataHandler class

        Raises
        ------
        ValueError
            If no handler exists for the given type.
        """
        dataHandlerModule = importlib.import_module("hera.datalayer.datahandler")

        handlerName = f"DataHandler_{objectType}"

        if not hasattr(dataHandlerModule, handlerName):
            raise ValueError(f"The data handler for the type {objectType} is not known")

        return getattr(dataHandlerModule, handlerName)

get_obj_or_instance_fullName(obj) staticmethod

Returns the fully qualified name of a class or instance, including its module.

Examples: >>> get_full_name(SomeClass) 'package.module.SomeClass'

>>> get_full_name(SomeClass())
'package.module.SomeClass'
Source code in hera/datalayer/datahandler.py
@staticmethod
def get_obj_or_instance_fullName(obj):
    """
    Returns the fully qualified name of a class or instance, including its module.

    Examples:
        >>> get_full_name(SomeClass)
        'package.module.SomeClass'

        >>> get_full_name(SomeClass())
        'package.module.SomeClass'
    """
    # If it's a class
    if isinstance(obj, type):
        cls = obj
    else:
        cls = obj.__class__

    module = cls.__module__
    qualname = cls.__qualname__

    if module == "builtins":
        return qualname  # No need to show 'builtins' for int, str, etc.
    return f"{module}.{qualname}"

getDataFormatName(obj_or_class) staticmethod

Tries to find the datatype name in hera for the object.
if cannot found, use general object.

Parameters:

Name Type Description Default
obj_or_class object or type.
required

Returns:

Type Description
A dict with
  • typeName : the string that identifies the datahandler. -ext : the extension of the file name.
Source code in hera/datalayer/datahandler.py
@staticmethod
def getDataFormatName(obj_or_class):
    """
        Tries to find the datatype name in hera for the object.
        if cannot found, use general object.

    Parameters
    ----------
    obj_or_class : object or type.

    Returns
    -------
        A dict with
            - typeName : the string that identifies the datahandler.
            -ext : the extension of the file name.
    """
    objTypeName = datatypes.get_obj_or_instance_fullName(obj_or_class)


    dataItemName = datatypes.typeDatatypeMap["object"] if objTypeName not in datatypes.typeDatatypeMap else \
    datatypes.typeDatatypeMap[objTypeName]

    return dataItemName["typeName"]

getDataFormatExtension(obj_or_class) staticmethod

Tries to find the datatype name in hera for the object.
if cannot found, use general object.

Parameters:

Name Type Description Default
obj_or_class object or type.
required

Returns:

Type Description
A dict with
  • typeName : the string that identifies the datahandler. -ext : the extension of the file name.
Source code in hera/datalayer/datahandler.py
@staticmethod
def getDataFormatExtension(obj_or_class):
    """
        Tries to find the datatype name in hera for the object.
        if cannot found, use general object.

    Parameters
    ----------
    obj_or_class : object or type.

    Returns
    -------
        A dict with
            - typeName : the string that identifies the datahandler.
            -ext : the extension of the file name.
    """
    objTypeName = datatypes.get_obj_or_instance_fullName(obj_or_class)


    dataItemName = datatypes.typeDatatypeMap["object"] if objTypeName not in datatypes.typeDatatypeMap else \
    datatypes.typeDatatypeMap[objTypeName]

    return dataItemName["ext"]

guessHandler(obj_or_class) staticmethod

Auto-detect the data format and return the appropriate handler class.

Parameters:

Name Type Description Default
obj_or_class object or type

The data object or class to detect the format for.

required

Returns:

Type Description
DataHandler class

The handler class for the detected format.

Source code in hera/datalayer/datahandler.py
@staticmethod
def guessHandler(obj_or_class):
    """
    Auto-detect the data format and return the appropriate handler class.

    Parameters
    ----------
    obj_or_class : object or type
        The data object or class to detect the format for.

    Returns
    -------
    DataHandler class
        The handler class for the detected format.
    """
    dataTypeName = datatypes.getDataFormatName(obj_or_class)

    return datatypes.getHandler(objectType=dataTypeName)

getHandler(objectType) staticmethod

Return the DataHandler class for the given data format name.

Parameters:

Name Type Description Default
objectType str

A data format name (e.g. datatypes.PARQUET).

required

Returns:

Type Description
DataHandler class

Raises:

Type Description
ValueError

If no handler exists for the given type.

Source code in hera/datalayer/datahandler.py
@staticmethod
def getHandler(objectType):
    """
    Return the DataHandler class for the given data format name.

    Parameters
    ----------
    objectType : str
        A data format name (e.g. ``datatypes.PARQUET``).

    Returns
    -------
    DataHandler class

    Raises
    ------
    ValueError
        If no handler exists for the given type.
    """
    dataHandlerModule = importlib.import_module("hera.datalayer.datahandler")

    handlerName = f"DataHandler_{objectType}"

    if not hasattr(dataHandlerModule, handlerName):
        raise ValueError(f"The data handler for the type {objectType} is not known")

    return getattr(dataHandlerModule, handlerName)

Documents

MetadataFrame

hera.datalayer.document.metadataDocument.MetadataFrame

Bases: object

A basic structure for a document.

Each document is related to a project and described by the following fields:

  • type : str : The type of the document. This is an helper attribute that is used to query the data.

  • resource: str: The resource that the document represents. This can be either path to a file on the disk or the data itself.

  • dataFormat : str: The format of the data. Taken from ::class:..datatypes.datatypes

  • desc: dict: A dictionary of arbitrary format that holds the metadata of the record.

  • id : str : The id of the record in the DB.

Source code in hera/datalayer/document/metadataDocument.py
class MetadataFrame(object):
    """
        A basic structure for a document.

        Each document is related to a project and described by the following fields:

        - type : str : The type of the document.
                       This is an helper attribute that is used to query the data.

        - resource: str: The resource that the document represents.
                         This can be either path to a file on the disk or the data itself.

        - dataFormat : str: The format of the data. Taken from ::class:`..datatypes.datatypes`

        - desc: dict: A dictionary of arbitrary format that holds the metadata of the record.

        - id : str : The id of the record in the DB.

    """
    projectName = StringField(required=True)
    desc = DictField(required=False)
    type = StringField(required=True)
    resource = DynamicField(required=True)
    dataFormat = StringField(required=True)

    def asDict(self, with_id=False):
        """
        Convert the document to a plain dictionary.

        Parameters
        ----------
        with_id : bool, optional
            If True, include the ``_id`` key. Default is False.

        Returns
        -------
        dict
            Dictionary representation of the document.
        """
        docDict = json.loads(self.to_json())
        if not with_id:
            docDict.pop('_id')
        # docDict.pop('_cls')
        return docDict

    def getData(self, **kwargs):
        """
        Returns the data of the document.

        the kwargs passed to the datahandler.
        See the datahandler class for your specific datatype.

        Parameters
        ----------
        kwargs : dict

        Returns
        -------
            object according to the datahandler. 
        """
        storeParametersDict = self.desc.get("storeParameters",{})
        storeParametersDict.update(kwargs)
        return getHandler(self.dataFormat).getData(resource=self.resource,desc=self.desc, **storeParametersDict)

    def __str__(self):
        """Return a pretty-printed JSON representation of the document."""
        return json.dumps(self.asDict(with_id=False),indent=4)

asDict(with_id=False)

Convert the document to a plain dictionary.

Parameters:

Name Type Description Default
with_id bool

If True, include the _id key. Default is False.

False

Returns:

Type Description
dict

Dictionary representation of the document.

Source code in hera/datalayer/document/metadataDocument.py
def asDict(self, with_id=False):
    """
    Convert the document to a plain dictionary.

    Parameters
    ----------
    with_id : bool, optional
        If True, include the ``_id`` key. Default is False.

    Returns
    -------
    dict
        Dictionary representation of the document.
    """
    docDict = json.loads(self.to_json())
    if not with_id:
        docDict.pop('_id')
    # docDict.pop('_cls')
    return docDict

getData(**kwargs)

Returns the data of the document.

the kwargs passed to the datahandler. See the datahandler class for your specific datatype.

Parameters:

Name Type Description Default
kwargs dict
{}

Returns:

Type Description
object according to the datahandler.
Source code in hera/datalayer/document/metadataDocument.py
def getData(self, **kwargs):
    """
    Returns the data of the document.

    the kwargs passed to the datahandler.
    See the datahandler class for your specific datatype.

    Parameters
    ----------
    kwargs : dict

    Returns
    -------
        object according to the datahandler. 
    """
    storeParametersDict = self.desc.get("storeParameters",{})
    storeParametersDict.update(kwargs)
    return getHandler(self.dataFormat).getData(resource=self.resource,desc=self.desc, **storeParametersDict)

__str__()

Return a pretty-printed JSON representation of the document.

Source code in hera/datalayer/document/metadataDocument.py
def __str__(self):
    """Return a pretty-printed JSON representation of the document."""
    return json.dumps(self.asDict(with_id=False),indent=4)

nonDBMetadataFrame

hera.datalayer.document.metadataDocument.nonDBMetadataFrame

Bases: object

A wrapper class to use when the data is not loaded into the DB.

This class will be used when getting data from local files.

Source code in hera/datalayer/document/metadataDocument.py
class nonDBMetadataFrame(object):
    """
        A wrapper class to use when the data is not loaded into the
        DB.

        This class will be used when getting data from local files.
    """
    _data = None

    def __init__(self, data, projectName=None, type=None, resource=None, dataFormat=None, **desc):
        """
        Initialize a non-database metadata frame.

        Parameters
        ----------
        data : object
            The data to wrap.
        projectName : str, optional
            The project name.
        type : str, optional
            The document type.
        resource : str, optional
            The resource path or identifier.
        dataFormat : str, optional
            The data format name.
        desc : dict
            Additional metadata fields.
        """
        self.projectName = projectName
        self.type = type
        self.resource = resource
        self.dataFormat = dataFormat
        self.desc = desc

        self._data = data

    def getData(self, **kwargs):
        """
        Return the wrapped data object.

        Returns
        -------
        object
            The data passed at initialization.
        """
        return self._data

    def __getitem__(self, item):
        """
        Access document attributes by key.

        Parameters
        ----------
        item : str
            The attribute name.

        Returns
        -------
        object
        """
        return self.__dict__[item]

__init__(data, projectName=None, type=None, resource=None, dataFormat=None, **desc)

Initialize a non-database metadata frame.

Parameters:

Name Type Description Default
data object

The data to wrap.

required
projectName str

The project name.

None
type str

The document type.

None
resource str

The resource path or identifier.

None
dataFormat str

The data format name.

None
desc dict

Additional metadata fields.

{}
Source code in hera/datalayer/document/metadataDocument.py
def __init__(self, data, projectName=None, type=None, resource=None, dataFormat=None, **desc):
    """
    Initialize a non-database metadata frame.

    Parameters
    ----------
    data : object
        The data to wrap.
    projectName : str, optional
        The project name.
    type : str, optional
        The document type.
    resource : str, optional
        The resource path or identifier.
    dataFormat : str, optional
        The data format name.
    desc : dict
        Additional metadata fields.
    """
    self.projectName = projectName
    self.type = type
    self.resource = resource
    self.dataFormat = dataFormat
    self.desc = desc

    self._data = data

getData(**kwargs)

Return the wrapped data object.

Returns:

Type Description
object

The data passed at initialization.

Source code in hera/datalayer/document/metadataDocument.py
def getData(self, **kwargs):
    """
    Return the wrapped data object.

    Returns
    -------
    object
        The data passed at initialization.
    """
    return self._data

__getitem__(item)

Access document attributes by key.

Parameters:

Name Type Description Default
item str

The attribute name.

required

Returns:

Type Description
object
Source code in hera/datalayer/document/metadataDocument.py
def __getitem__(self, item):
    """
    Access document attributes by key.

    Parameters
    ----------
    item : str
        The attribute name.

    Returns
    -------
    object
    """
    return self.__dict__[item]

Caching

hera.datalayer.autocache

cacheDecorators

Internal implementation of the function caching mechanism.

Wraps a function call, serializes its arguments, checks the database for a cached result, and stores the result if not already cached.

Source code in hera/datalayer/autocache.py
class cacheDecorators:
    """
    Internal implementation of the function caching mechanism.

    Wraps a function call, serializes its arguments, checks the database for
    a cached result, and stores the result if not already cached.
    """

    prepareFunction = None
    postProcessFunction = None
    projectName = None
    dataFormat = None

    @staticmethod
    def is_mongo_serializable(obj):
        """Check whether ``obj`` can be BSON-encoded for MongoDB storage."""
        try:
            # BSON expects a dict at the top level
            BSON.encode({'test': obj})
            return True
        except InvalidDocument:
            return False

    # Serialize an object into a plain text
    @staticmethod
    def obj_to_txt(obj):
        """Serialize ``obj`` to a base64-encoded text string via pickle."""
        message_bytes = pickle.dumps(obj)
        base64_bytes = base64.b64encode(message_bytes)
        txt = base64_bytes.decode('ascii')
        return txt

    # De-serialize an object from a plain text
    @staticmethod
    def txt_to_obj(txt):
        """Deserialize an object from a base64-encoded text string."""
        base64_bytes = txt.encode('ascii')
        message_bytes = base64.b64decode(base64_bytes)
        obj = pickle.loads(message_bytes)
        return obj

    def __init__(self, func,dataFormat,projectName = None,postProcessFunction=None,getDataParams={},storeDataParams={}):
        """
        Parameters
        ----------
        func : callable
            The function whose results are cached.
        dataFormat : str or None
            Storage format for the cached data.
        projectName : str or None
            Project that owns the cache collection.
        postProcessFunction : callable or None
            Optional transform applied to the result before returning.
        getDataParams : dict
            Extra keyword arguments forwarded to ``getData``.
        storeDataParams : dict
            Extra keyword arguments forwarded when saving data.
        """
        self.func = func
        self.postProcessFunction = postProcessFunction
        self.projectName = projectName
        self.getDataParams = getDataParams
        self.storeDataParams = storeDataParams
        self.dataFormat = dataFormat

    def __call__(self, *args, **kwargs):
        """Execute the function, returning a cached result when available."""
        sig = inspect.signature(self.func)

        # Bind the passed args and kwargs to the signature
        bound = sig.bind(*args, **kwargs)

        # Apply defaults to fill in any missing optional arguments
        bound.apply_defaults()

        call_info = dict(bound.arguments)

        if 'self' in call_info:
            call_info['context'] = call_info.pop('self')

        # convert any pint/unum to standardized MKS and dict with the magnitude and units seperated.
        # This will allow the query of the querys even if they are given in different units
        call_info_JSON = ConfigurationToJSON(call_info, standardize=True, splitUnits=True, keepOriginalUnits=True)

        call_info_serialized = dict()

        for key,value in call_info_JSON.items():
            serializable = cacheDecorators.is_mongo_serializable(value)
            serialized_value = value if serializable else cacheDecorators.obj_to_txt(value)
            call_info_serialized[key] = (serializable,serialized_value)


        # Add the function name

        call_info_serialized['functionName'] =  self._get_full_func_name(self.func)

        data = self.checkIfFunctionIsCached(call_info_serialized)
        if data is None:
            data = self.func(*args, **kwargs)
            # query without the original units. This allows the user to query different units
            #call_info_query_JSON = ConfigurationToJSON(call_info, standardize=True, splitUnits=True, keepOriginalUnits=False)
            if data is not None:
                doc = self.saveFunctionCache(call_info_serialized,data)

        ret = data if self.postProcessFunction is None else self.postProcessFunction(data)
        return ret

    def _get_full_func_name(self,func):
        """Returns the full qualified path: module.[class.]function_name"""
        if not callable(func):
            raise TypeError("Provided object is not callable.")

        # Handle bound methods by unwrapping them
        if inspect.ismethod(func):
            # Get the original function and its class
            cls = func.__self__.__class__
            method_name = func.__name__
            class_qualname = cls.__qualname__
            module = func.__module__
            if module == "__main__":
                ret = f"{class_qualname}.{method_name}"
            else:
                ret = f"{module}.{class_qualname}.{method_name}"

        elif inspect.isfunction(func):
            ret = func.__qualname__
        else:
            # Handle unbound class or static methods and plain functions
            qualname = func.__qualname__
            module = func.__module__
            ret = f"{module}.{qualname}"

        return ret


    def checkIfFunctionIsCached(self,call_info):
        """
            Check if the function and the parameters are stored in the DB.
        Parameters
        ----------
        call_info : dict
            A dict with the info on the function that was called.
            functionName and functionParameters as parameters.

        Returns
        -------
            None if the data does not exist,
            the data otherwise.
        """

        proj = Project(self.projectName)
        docList = proj.getCacheDocuments(type="functionCacheData",**call_info)
        return None if len(docList)==0 else docList[0].getData(**self.getDataParams)

    def saveFunctionCache(self,call_info,data):
        """
            Save the data to the disk.
        Parameters
        ----------
        data

        Returns
        -------

        """
        proj = Project(self.projectName)
        return proj.saveCacheData(name=call_info['functionName'], data=data, desc=call_info, type="functionCacheData",dataFormat=self.dataFormat)
is_mongo_serializable(obj) staticmethod

Check whether obj can be BSON-encoded for MongoDB storage.

Source code in hera/datalayer/autocache.py
@staticmethod
def is_mongo_serializable(obj):
    """Check whether ``obj`` can be BSON-encoded for MongoDB storage."""
    try:
        # BSON expects a dict at the top level
        BSON.encode({'test': obj})
        return True
    except InvalidDocument:
        return False
obj_to_txt(obj) staticmethod

Serialize obj to a base64-encoded text string via pickle.

Source code in hera/datalayer/autocache.py
@staticmethod
def obj_to_txt(obj):
    """Serialize ``obj`` to a base64-encoded text string via pickle."""
    message_bytes = pickle.dumps(obj)
    base64_bytes = base64.b64encode(message_bytes)
    txt = base64_bytes.decode('ascii')
    return txt
txt_to_obj(txt) staticmethod

Deserialize an object from a base64-encoded text string.

Source code in hera/datalayer/autocache.py
@staticmethod
def txt_to_obj(txt):
    """Deserialize an object from a base64-encoded text string."""
    base64_bytes = txt.encode('ascii')
    message_bytes = base64.b64decode(base64_bytes)
    obj = pickle.loads(message_bytes)
    return obj
__init__(func, dataFormat, projectName=None, postProcessFunction=None, getDataParams={}, storeDataParams={})

Parameters:

Name Type Description Default
func callable

The function whose results are cached.

required
dataFormat str or None

Storage format for the cached data.

required
projectName str or None

Project that owns the cache collection.

None
postProcessFunction callable or None

Optional transform applied to the result before returning.

None
getDataParams dict

Extra keyword arguments forwarded to getData.

{}
storeDataParams dict

Extra keyword arguments forwarded when saving data.

{}
Source code in hera/datalayer/autocache.py
def __init__(self, func,dataFormat,projectName = None,postProcessFunction=None,getDataParams={},storeDataParams={}):
    """
    Parameters
    ----------
    func : callable
        The function whose results are cached.
    dataFormat : str or None
        Storage format for the cached data.
    projectName : str or None
        Project that owns the cache collection.
    postProcessFunction : callable or None
        Optional transform applied to the result before returning.
    getDataParams : dict
        Extra keyword arguments forwarded to ``getData``.
    storeDataParams : dict
        Extra keyword arguments forwarded when saving data.
    """
    self.func = func
    self.postProcessFunction = postProcessFunction
    self.projectName = projectName
    self.getDataParams = getDataParams
    self.storeDataParams = storeDataParams
    self.dataFormat = dataFormat
__call__(*args, **kwargs)

Execute the function, returning a cached result when available.

Source code in hera/datalayer/autocache.py
def __call__(self, *args, **kwargs):
    """Execute the function, returning a cached result when available."""
    sig = inspect.signature(self.func)

    # Bind the passed args and kwargs to the signature
    bound = sig.bind(*args, **kwargs)

    # Apply defaults to fill in any missing optional arguments
    bound.apply_defaults()

    call_info = dict(bound.arguments)

    if 'self' in call_info:
        call_info['context'] = call_info.pop('self')

    # convert any pint/unum to standardized MKS and dict with the magnitude and units seperated.
    # This will allow the query of the querys even if they are given in different units
    call_info_JSON = ConfigurationToJSON(call_info, standardize=True, splitUnits=True, keepOriginalUnits=True)

    call_info_serialized = dict()

    for key,value in call_info_JSON.items():
        serializable = cacheDecorators.is_mongo_serializable(value)
        serialized_value = value if serializable else cacheDecorators.obj_to_txt(value)
        call_info_serialized[key] = (serializable,serialized_value)


    # Add the function name

    call_info_serialized['functionName'] =  self._get_full_func_name(self.func)

    data = self.checkIfFunctionIsCached(call_info_serialized)
    if data is None:
        data = self.func(*args, **kwargs)
        # query without the original units. This allows the user to query different units
        #call_info_query_JSON = ConfigurationToJSON(call_info, standardize=True, splitUnits=True, keepOriginalUnits=False)
        if data is not None:
            doc = self.saveFunctionCache(call_info_serialized,data)

    ret = data if self.postProcessFunction is None else self.postProcessFunction(data)
    return ret
checkIfFunctionIsCached(call_info)
Check if the function and the parameters are stored in the DB.

Parameters:

Name Type Description Default
call_info dict

A dict with the info on the function that was called. functionName and functionParameters as parameters.

required

Returns:

Type Description
None if the data does not exist,

the data otherwise.

Source code in hera/datalayer/autocache.py
def checkIfFunctionIsCached(self,call_info):
    """
        Check if the function and the parameters are stored in the DB.
    Parameters
    ----------
    call_info : dict
        A dict with the info on the function that was called.
        functionName and functionParameters as parameters.

    Returns
    -------
        None if the data does not exist,
        the data otherwise.
    """

    proj = Project(self.projectName)
    docList = proj.getCacheDocuments(type="functionCacheData",**call_info)
    return None if len(docList)==0 else docList[0].getData(**self.getDataParams)
saveFunctionCache(call_info, data)
Save the data to the disk.

Parameters:

Name Type Description Default
data
required
Source code in hera/datalayer/autocache.py
def saveFunctionCache(self,call_info,data):
    """
        Save the data to the disk.
    Parameters
    ----------
    data

    Returns
    -------

    """
    proj = Project(self.projectName)
    return proj.saveCacheData(name=call_info['functionName'], data=data, desc=call_info, type="functionCacheData",dataFormat=self.dataFormat)

clearAllFunctionsCache(projectName=None)

Remove the cache of all functions.

Parameters:

Name Type Description Default
projectName
None
Source code in hera/datalayer/autocache.py
def clearAllFunctionsCache(projectName=None):
    """
        Remove the cache of all functions.
    Parameters
    ----------
    projectName

    Returns
    -------

    """
    clearFunctionCache(functionName=None,projectName=projectName)

clearFunctionCache(functionName, projectName=None)

Removes all the cache documents of the function with the data from the disk.

Parameters:

Name Type Description Default
functionName str

The name of the function

required
projectName str

The name of the project that holds the cache. If None, load the name from the caseConfiguration.

None
Source code in hera/datalayer/autocache.py
def clearFunctionCache(functionName,projectName=None):
    """
        Removes all the cache documents of the function with the data from the disk.
    Parameters
    ----------
    functionName : str
        The name of the function
    projectName : str
        The name of the project that holds the cache. If None, load the name from the caseConfiguration.

    Returns
    -------

    """
    proj = Project(projectName=projectName)
    paramDict = dict()
    if functionName is not None:
        paramDict['functionName'] = functionName

    docList = proj.deleteCacheDocuments(type ="functionCacheData",**paramDict)
    for doc in docList:
        if os.path.exists(doc['resource']):
            if os.path.isdir(doc['resource']):
                shutil.rmtree(doc['resource'])
            else:
                os.remove(doc['resource'])

    return True

cacheFunction(_func=None, *, returnFormat=None, projectName=None, postProcessFunction=None, getDataParams={}, storeDataParams={})

Decorator that caches a function's return value in the project database.

On first call, the function executes and its result is saved as a cache document. On subsequent calls with the same arguments, the cached result is returned instead of re-executing the function.

Can be used with or without arguments::

@cacheFunction
def my_func(x):
    ...

@cacheFunction(returnFormat=datatypes.PARQUET, projectName="myproject")
def my_func(x):
    ...

Parameters:

Name Type Description Default
returnFormat str

The data format to use when storing the result. If None, auto-detected.

None
projectName str

The project to store the cache in. If None, loaded from caseConfiguration.

None
postProcessFunction callable

A function applied to the result before returning it.

None
getDataParams dict

Extra keyword arguments passed to getData when loading from cache.

{}
storeDataParams dict

Extra keyword arguments passed when saving to cache.

{}
Source code in hera/datalayer/autocache.py
def cacheFunction(_func=None, *, returnFormat=None, projectName=None, postProcessFunction=None, getDataParams={},storeDataParams={}):
    """
    Decorator that caches a function's return value in the project database.

    On first call, the function executes and its result is saved as a cache document.
    On subsequent calls with the same arguments, the cached result is returned
    instead of re-executing the function.

    Can be used with or without arguments::

        @cacheFunction
        def my_func(x):
            ...

        @cacheFunction(returnFormat=datatypes.PARQUET, projectName="myproject")
        def my_func(x):
            ...

    Parameters
    ----------
    returnFormat : str, optional
        The data format to use when storing the result. If None, auto-detected.
    projectName : str, optional
        The project to store the cache in. If None, loaded from caseConfiguration.
    postProcessFunction : callable, optional
        A function applied to the result before returning it.
    getDataParams : dict, optional
        Extra keyword arguments passed to ``getData`` when loading from cache.
    storeDataParams : dict, optional
        Extra keyword arguments passed when saving to cache.
    """
    def decorator(func):
        """Wrap the target function with caching logic."""
        @wraps(func)
        def wrapper(*args, **kwargs):
            """Invoke the cached version of the wrapped function."""
            return cacheDecorators(
                func=func,
                dataFormat=returnFormat,
                projectName=projectName,
                postProcessFunction=postProcessFunction,
                getDataParams=getDataParams,
                storeDataParams=storeDataParams
            )(*args, **kwargs)
        return wrapper

    if _func is None:
        # Decorator called with parentheses and arguments
        return decorator
    else:
        # Decorator used directly like @cacheFunction
        return decorator(_func)