Skip to content

Data Layer

This page covers the MongoDB document model, the datatypes system for format dispatch, and the repository JSON pipeline that loads data into projects.


MongoDB Document Model

Hera stores all metadata in MongoDB using a single base model (Metadata) with three subtypes. Each document represents a pointer to data — the actual data lives on disk (or inline for small values).

Diagram

Measurements : "manages" Simulations_Collection ..> Simulations : "manages" Cache_Collection ..> Cache : "manages"

-->
--> Measurements : "manages"
    Simulations_Collection ..> Simulations : "manages"
    Cache_Collection ..> Cache : "manages"
--> -->

Document Fields

Field Type Description
projectName str The project this document belongs to
_cls str Discriminator: "Metadata.Measurements", "Metadata.Simulations", or "Metadata.Cache"
type str Application-defined type tag (e.g., "ToolkitDataSource", "Experiment_rawData")
resource str Path to the data file on disk, or inline value for small data
dataFormat str One of the datatypes constants (see below)
desc dict Free-form metadata dictionary — toolkit name, version, parameters, etc.

Collection Architecture

Each collection type wraps a MongoEngine document class and provides the standard CRUD interface:

Diagram

GetMeas --> GetDoc DelMeas --> DelDoc AddDoc --> MetadataCol GetDoc --> MetadataCol DelDoc --> MetadataCol

-->
--> GetMeas --> GetDoc
    DelMeas --> DelDoc
    AddDoc --> MetadataCol
    GetDoc --> MetadataCol
    DelDoc --> MetadataCol
--> -->

Three Parallel APIs

The Project class exposes identical method sets for all three collection types: addMeasurementsDocument / addSimulationsDocument / addCacheDocument, and similarly for get and delete. Under the hood, each delegates to its own Collection instance which filters by the _cls discriminator.


The datatypes System

Source: hera/datalayer/datahandler.py (class datatypes)

The datatypes class defines all supported data format constants and provides the dispatch logic to read and write data in each format.

Supported Formats

Diagram

subgraph DynamicFormats ["Dynamic"] direction LR CLASS["CLASS\nClass\npydoc.locate + instantiate"] end

-->
--> subgraph DynamicFormats ["Dynamic"]
        direction LR
        CLASS["CLASS\nClass\npydoc.locate + instantiate"]
    end
--> -->

Constant Value Description
STRING "string" Plain text / path string
CSV_PANDAS "csv_pandas" CSV file read via pandas
HDF "HDF" HDF5 file
NETCDF_XARRAY "netcdf_xarray" NetCDF file read via xarray
ZARR_XARRAY "zarr_xarray" Zarr archive read via xarray
JSON_DICT "JSON_dict" JSON file parsed to dict
JSON_PANDAS "JSON_pandas" JSON file read via pandas
JSON_GEOPANDAS "JSON_geopandas" GeoJSON file read via geopandas
GEOPANDAS "geopandas" Shapefile / GeoPackage read via geopandas
GEOTIFF "geotiff" GeoTIFF raster read via rasterio
PARQUET "parquet" Parquet file read via dask/pandas
IMAGE "image" Image file read via matplotlib
PICKLE "pickle" Python pickle file
DICT "dict" Inline dictionary (stored in resource)
NUMPY_ARRAY "numpy_array" NumPy .npy/.npz file
NUMPY_DICT_ARRAY "numpy_dict_array" Dict of NumPy arrays
CLASS "Class" Dynamic Python class (imported at runtime)

Format Dispatch Flow

When document.getData() is called, the system resolves the handler based on dataFormat:

Diagram

LoadClass --> Return ReadPickle --> Return ReadTiff --> Return ReadImg --> Return ReturnString --> Return

-->
-->  LoadClass --> Return
    ReadPickle --> Return
    ReadTiff --> Return
    ReadImg --> Return
    ReturnString --> Return
--> -->

Auto-Detection

The datatypes.getDataFormatName(data) static method can auto-detect the format from a Python object (DataFrame -> "parquet", xarray.Dataset -> "netcdf_xarray", dict -> "JSON_dict", etc.). This is used by Project.saveData() to automatically choose the right format and file extension.


Repository JSON Structure

A repository JSON is the standard way to declare and load data into a Hera project. It maps toolkit names to their configuration, datasources, and documents.

Format

{
    "<ToolkitName>": {
        "Config": {
            "key1": "value1",
            "key2": "value2"
        },
        "Datasource": {
            "<datasource_name>": {
                "isRelativePath": "True",
                "item": {
                    "resource": "relative/path/to/data.parquet",
                    "dataFormat": "parquet",
                    "version": [0, 0, 1],
                    "desc": { ... }
                }
            }
        },
        "Measurements": {
            "<measurement_name>": {
                "isRelativePath": "True",
                "item": {
                    "resource": "relative/path/to/file.shp",
                    "dataFormat": "geopandas",
                    "type": "SomeType",
                    "desc": { ... }
                }
            }
        }
    }
}

Loading Pipeline

Diagram

DT->>Toolkit: Call named functionwith parameters end end end

DT-->>User: Loading complete

--> -->DT->>Toolkit: Call named function<br/>with parameters end end end DT-->>User: Loading complete --> -->

Path Resolution

Each item in the repository JSON has an isRelativePath flag:

  • "True" — The resource path is relative to the JSON file's directory. The loader prepends basedir to make it absolute.
  • "False" — The resource is already an absolute path and is used as-is.

String Booleans

The isRelativePath field accepts both string "True"/"False" and Python booleans true/false. The loader checks for both forms. Always be explicit to avoid ambiguity.

Static Loading (No MongoDB)

For testing or lightweight scripts, dataToolkit provides two static methods that work without MongoDB:

from hera.utils.data.toolkit import dataToolkit

# Load and resolve all paths in one call
repo = dataToolkit.loadRepositoryFromPath("/path/to/repository.json")

# Or resolve paths on an already-parsed dict
resolved = dataToolkit.resolveDataSourcePaths(repo_dict, basedir="/data/root")

These methods perform a deep copy of the JSON and resolve all relative resource paths to absolute, but do not insert anything into MongoDB.


ToolkitDataSource Documents

When a datasource is registered via abstractToolkit.addDataSource(), it creates a special document:

{
    "projectName": "MY_PROJECT",
    "_cls": "Metadata.Measurements",
    "type": "ToolkitDataSource",
    "resource": "/data/meteorology/YAVNEEL.parquet",
    "dataFormat": "parquet",
    "desc": {
        "toolkit": "MeteoLowFreq",
        "datasourceName": "YAVNEEL",
        "version": [0, 0, 1]
    }
}

Querying Datasources

The abstractToolkit methods always filter by type="ToolkitDataSource" and toolkit=self.toolkitName. This ensures that each toolkit only sees its own datasources, even though all documents share the same MongoDB collection.

Version Resolution

When getDataSourceDocument(name) is called without a version:

Diagram

s" --> PickMax["Sort by version\ntuple and pick\nhighest version"]

PickMax --> ReturnDoc
QueryDefault --> ReturnDoc

--> -->s" --> PickMax["Sort by version\ntuple and pick\nhighest version"] PickMax --> ReturnDoc QueryDefault --> ReturnDoc --> -->

addDataSource Swimlane

The full call chain when a toolkit registers a new data source — from the toolkit API down through the data layer to MongoDB:

Diagram

add[Type]Document Swimlane

The call chain for adding documents to each collection (Measurements, Simulations, Cache). All three follow the same pattern — only the collection class differs:

Diagram

loadData Swimlane (HighFreqToolKit)

The complete flow for ingesting raw sensor data — from parsing through to data source registration:

Diagram


Connection Management (document/__init__.py)

How connections are established

When hera is imported, the document/__init__.py module automatically connects to all databases defined in ~/.pyhera/config.json:

# Runs at import time (bottom of document/__init__.py)
for user in getDBNamesFromJSON():
    createDBConnection(
        connectionName=user,
        mongoConfig=getMongoConfigFromJson(connectionName=user)
    )

Dynamic class creation

MongoDB document classes are created dynamically at runtime using Python's type() builtin. This allows each database connection to have its own set of MongoEngine document classes with the correct db_alias:

# Creates a new class: Metadata(DynamicDocument, MetadataFrame)
new_Metadata = type('Metadata', (DynamicDocument, MetadataFrame), {
    'meta': {
        'db_alias': f'{dbName}-alias',  # binds to specific DB
        'allow_inheritance': True,       # enables Measurements/Simulations/Cache subtypes
        'auto_create_indexes': True,
        'indexes': ['projectName']       # index for fast project queries
    }
})

# Subtypes inherit from the dynamic Metadata class
new_Measurements = type('Measurements', (new_Metadata,), {})
new_Simulations = type('Simulations', (new_Metadata,), {})
new_Cache = type('Cache', (new_Metadata,), {})

The dbObjects registry

All connections and document classes are stored in a module-level dictionary:

dbObjects = {
    "connectionName1": {
        "connection": <mongoengine connection>,
        "Metadata": <dynamic Metadata class>,
        "Measurements": <dynamic Measurements class>,
        "Simulations": <dynamic Simulations class>,
        "Cache": <dynamic Cache class>,
    },
    "connectionName2": { ... },
}

getDBObject(objectName, connectionName) retrieves a class from this registry. Collections use it to get their MongoEngine document class:

# Inside AbstractCollection.__init__:
self._metadataCol = getDBObject('Metadata', connectionName)
# or for typed collections:
self._metadataCol = getDBObject('Measurements', connectionName)

Multi-database support

Each connection name maps to a separate MongoDB database. This enables: - Different projects on different servers - Shared "public" databases alongside local ones - Parallel connections with different aliases


MetadataFrame (document/metadataDocument.py)

getData() dispatch

MetadataFrame.getData() is the bridge between metadata and actual data:

def getData(self, **kwargs):
    storeParametersDict = self.desc.get("storeParameters", {})
    storeParametersDict.update(kwargs)
    return getHandler(self.dataFormat).getData(
        resource=self.resource, desc=self.desc, **storeParametersDict
    )
  1. Reads storeParameters from the document's desc — these were saved when the data was written (e.g., usePandas=True for parquet)
  2. Merges with any kwargs passed by the caller
  3. Calls getHandler(dataFormat) to find the right DataHandler_* class
  4. Delegates to the handler's getData(resource, desc, **params)

nonDBMetadataFrame

A wrapper for data that isn't stored in MongoDB. Used by saveData when saveMode=NOSAVE and by createNewArea when data is computed in memory:

class nonDBMetadataFrame:
    def __init__(self, data, projectName=None, type=None, ...):
        self._data = data   # the actual Python object

    def getData(self, **kwargs):
        return self._data   # just returns the object, no handler dispatch

DataHandler Pattern (datahandler.py)

How handlers work

Each DataHandler_* class is a static utility with two methods:

class DataHandler_parquet:
    @staticmethod
    def saveData(resource, fileName, **kwargs):
        # Save the data object to disk
        resource.to_parquet(fileName, **kwargs)
        return {"usePandas": True}  # store parameters returned to caller

    @staticmethod
    def getData(resource, desc={}, usePandas=False, **kwargs):
        # Load data from disk
        df = dask.dataframe.read_parquet(resource, **kwargs)
        if usePandas:
            df = df.compute()
        return df

Key pattern: - saveData writes to disk and returns a dict of store parameters — these are saved in desc.storeParameters so getData can reproduce the exact same load behavior - getData reads from disk using resource (file path) and desc for metadata

Handler dispatch

def getHandler(objectType):
    handlerName = f"DataHandler_{objectType}"
    return getattr(datahandler_module, handlerName)

objectType is the dataFormat string (e.g., "parquet"DataHandler_parquet).

Auto-detection

When saving data with Project.saveData(), the format is auto-detected:

datatypes.typeDatatypeMap = {
    "pandas.core.frame.DataFrame": {"typeName": "parquet", "ext": "parquet"},
    "geopandas.geodataframe.GeoDataFrame": {"typeName": "geopandas", "ext": "gpkg"},
    "xarray.core.dataarray.DataArray": {"typeName": "zarr_xarray", "ext": "zarr"},
    "numpy.ndarray": {"typeName": "numpy_array", "ext": "npy"},
    "dict": {"typeName": "pickle", "ext": "pckle"},
    # ...
}

datatypes.getDataFormatName(obj) looks up the fully-qualified class name in this map and returns the format string.

Adding a new handler

  1. Create a class DataHandler_myformat in datahandler.py:

    class DataHandler_myformat:
        @staticmethod
        def saveData(resource, fileName, **kwargs):
            # write resource to fileName
            return {}
    
        @staticmethod
        def getData(resource, desc={}, **kwargs):
            # read and return data from resource
            pass
    

  2. Add a constant to datatypes:

    MYFORMAT = "myformat"
    

  3. Optionally add to typeDatatypeMap for auto-detection:

    "mypackage.MyClass": {"typeName": "myformat", "ext": "myext"}
    


Function Caching (autocache.py)

How @cacheFunction works

The cacheFunction decorator caches function return values in the project database:

@cacheFunction(returnFormat=datatypes.PARQUET, projectName="MY_PROJECT")
def expensive_computation(x, y):
    # ... long computation ...
    return result_df

Cache lookup flow

1. Function called with (args, kwargs)
2. Bind args to function signature → dict of all parameters
3. Convert to JSON (ConfigurationToJSON) with standardized MKS units
4. Serialize non-BSON values to base64 text
5. Add function's fully-qualified name
6. Query Cache collection: type="functionCacheData" + all serialized params
7a. Cache HIT → doc.getData() → return
7b. Cache MISS → execute function → saveData → create cache document → return

Argument serialization

Each function argument is checked for BSON compatibility:

for key, value in call_info.items():
    serializable = BSON.encode({'test': value})  # try BSON
    if serializable:
        call_info_serialized[key] = (True, value)      # store as-is
    else:
        call_info_serialized[key] = (False, base64(pickle(value)))  # serialize

This handles complex objects (numpy arrays, custom classes) that MongoDB can't store natively.

Unit standardization

Arguments with physical units (pint Quantities or Unum) are converted to MKS before querying. This means 5 * ureg.km and 5000 * ureg.m produce the same cache key — the cache is unit-aware.


API Reference

hera.datalayer.datahandler.datatypes

Registry of supported data format constants and dispatch logic for data handlers.

Each constant (e.g. STRING, PARQUET, HDF) identifies a data format. Use getHandler(formatName) to retrieve the corresponding DataHandler_* class, or getDataFormatName(obj) to auto-detect the format from a Python object.

Source code in hera/datalayer/datahandler.py
class datatypes:
    """
    Registry of supported data format constants and dispatch logic for data handlers.

    Each constant (e.g. ``STRING``, ``PARQUET``, ``HDF``) identifies a data format.
    Use ``getHandler(formatName)`` to retrieve the corresponding ``DataHandler_*`` class,
    or ``getDataFormatName(obj)`` to auto-detect the format from a Python object.
    """
    STRING = "string"
    TIME = "time"
    CSV_PANDAS = "csv_pandas"
    HDF = "HDF"
    NETCDF_XARRAY = "netcdf_xarray"
    ZARR_XARRAY = "zarr_xarray"
    JSON_DICT = "JSON_dict"
    JSON_PANDAS = "JSON_pandas"
    JSON_GEOPANDAS = "JSON_geopandas"
    GEOPANDAS = "geopandas"
    GEOTIFF = "geotiff"
    PARQUET = "parquet"
    IMAGE = "image"
    PICKLE = "pickle"
    DICT = "dict"
    NUMPY_ARRAY = "numpy_array"
    NUMPY_DICT_ARRAY = "numpy_dict_array"  # A dict of numpy arrays, no automatic detection.
    CLASS = "Class"

    @staticmethod
    def get_obj_or_instance_fullName(obj):
        """
        Returns the fully qualified name of a class or instance, including its module.

        Examples:
            >>> get_full_name(SomeClass)
            'package.module.SomeClass'

            >>> get_full_name(SomeClass())
            'package.module.SomeClass'
        """
        # If it's a class
        if isinstance(obj, type):
            cls = obj
        else:
            cls = obj.__class__

        module = cls.__module__
        qualname = cls.__qualname__

        if module == "builtins":
            return qualname  # No need to show 'builtins' for int, str, etc.
        return f"{module}.{qualname}"

    typeDatatypeMap = {
        "str": dict(typeName=STRING, ext="txt"),
        "pandas.core.frame.DataFrame": dict(typeName=PARQUET, ext="parquet"),
        'pandas.core.series.Series': dict(typeName=JSON_PANDAS, ext="json"),
        "dask_expr._collection.DataFrame": dict(typeName=PARQUET, ext="parquet"),
        'geopandas.geodataframe.GeoDataFrame': dict(typeName=GEOPANDAS, ext="gpkg"),
        'xarray.core.dataarray.DataArray': dict(typeName=ZARR_XARRAY, ext="zarr"),
        "dict": dict(typeName=PICKLE, ext="pckle"),
        "list": dict(typeName=PICKLE, ext="pckle"),
        "bytes": dict(typeName=PICKLE, ext="pckle"),
        "object": dict(typeName=PICKLE, ext="pckle"),
        "numpy.ndarray": dict(typeName=NUMPY_ARRAY, ext="npy")
    }

    @staticmethod
    def getDataFormatName(obj_or_class):
        """
            Tries to find the datatype name in hera for the object.
            if cannot found, use general object.

        Parameters
        ----------
        obj_or_class : object or type.

        Returns
        -------
            A dict with
                - typeName : the string that identifies the datahandler.
                -ext : the extension of the file name.
        """
        objTypeName = datatypes.get_obj_or_instance_fullName(obj_or_class)


        dataItemName = datatypes.typeDatatypeMap["object"] if objTypeName not in datatypes.typeDatatypeMap else \
        datatypes.typeDatatypeMap[objTypeName]

        return dataItemName["typeName"]

    @staticmethod
    def getDataFormatExtension(obj_or_class):
        """
            Tries to find the datatype name in hera for the object.
            if cannot found, use general object.

        Parameters
        ----------
        obj_or_class : object or type.

        Returns
        -------
            A dict with
                - typeName : the string that identifies the datahandler.
                -ext : the extension of the file name.
        """
        objTypeName = datatypes.get_obj_or_instance_fullName(obj_or_class)


        dataItemName = datatypes.typeDatatypeMap["object"] if objTypeName not in datatypes.typeDatatypeMap else \
        datatypes.typeDatatypeMap[objTypeName]

        return dataItemName["ext"]

    @staticmethod
    def guessHandler(obj_or_class):
        """
        Auto-detect the data format and return the appropriate handler class.

        Parameters
        ----------
        obj_or_class : object or type
            The data object or class to detect the format for.

        Returns
        -------
        DataHandler class
            The handler class for the detected format.
        """
        dataTypeName = datatypes.getDataFormatName(obj_or_class)

        return datatypes.getHandler(objectType=dataTypeName)

    @staticmethod
    def getHandler(objectType):
        """
        Return the DataHandler class for the given data format name.

        Parameters
        ----------
        objectType : str
            A data format name (e.g. ``datatypes.PARQUET``).

        Returns
        -------
        DataHandler class

        Raises
        ------
        ValueError
            If no handler exists for the given type.
        """
        dataHandlerModule = importlib.import_module("hera.datalayer.datahandler")

        handlerName = f"DataHandler_{objectType}"

        if not hasattr(dataHandlerModule, handlerName):
            raise ValueError(f"The data handler for the type {objectType} is not known")

        return getattr(dataHandlerModule, handlerName)

STRING = 'string' class-attribute instance-attribute

CSV_PANDAS = 'csv_pandas' class-attribute instance-attribute

NETCDF_XARRAY = 'netcdf_xarray' class-attribute instance-attribute

JSON_DICT = 'JSON_dict' class-attribute instance-attribute

GEOPANDAS = 'geopandas' class-attribute instance-attribute

PARQUET = 'parquet' class-attribute instance-attribute

CLASS = 'Class' class-attribute instance-attribute

hera.utils.data.toolkit.dataToolkit

Bases: abstractToolkit

Toolkit for managing data repositories (replacing the old hera-data).

It is initialized only with the DEFAULT project.

The structure of a datasource file is:

{
    "<toolkit name>": {
        "<datasource name>": {
            "resource": "<location of datasource>",
            "dataFormat": "<type of data source>",
            "desc": {
                ... metadata ...
            }
        },
        ...
    },
    ...
}
Source code in hera/utils/data/toolkit.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
class dataToolkit(abstractToolkit):
    """
    Toolkit for managing data repositories (replacing the old hera-data).

    It is initialized only with the DEFAULT project.

    The structure of a datasource file is:

        {
            "<toolkit name>": {
                "<datasource name>": {
                    "resource": "<location of datasource>",
                    "dataFormat": "<type of data source>",
                    "desc": {
                        ... metadata ...
                    }
                },
                ...
            },
            ...
        }
    """

    def __init__(self, connectionName=None):
        """
        Initialize the dataToolkit on the default project.

        Parameters
        ----------
        connectionName : str, optional
            The DB connection name. If None, uses the current OS username.
        """
        super().__init__(toolkitName="heradata", projectName=self.DEFAULTPROJECT, filesDirectory=None, connectionName=connectionName)

    def addRepository(self, repositoryName, repositoryPath, overwrite=False):
        """
        Register a repository JSON file as a data source.

        Parameters
        ----------
        repositoryName : str
            The name to register the repository under.
        repositoryPath : str
            Path to the repository JSON file. ``.json`` extension is appended if missing.
        overwrite : bool
            If True, overwrite an existing repository with the same name.
        """
        self._allowWritingToDefaultProject = True  # allows the addition of datasource to the Default project.

        repositoryPath = f"{repositoryPath}.json" if "json" not in repositoryPath else repositoryPath
        self.addDataSource(dataSourceName=repositoryName, resource=os.path.abspath(repositoryPath),
                           dataFormat=self.datatypes.JSON_DICT, overwrite=overwrite)
        self._allowWritingToDefaultProject = False

    def getRepositoryTable(self):
        """
        Return a DataFrame listing all registered repositories.

        Returns
        -------
        pandas.DataFrame
        """
        return self.getDataSourceTable()

    def getRepository(self, repositoryName):
        """
        Load and return a repository's JSON content by name.

        Parameters
        ----------
        repositoryName : str
            The name of the registered repository.

        Returns
        -------
        dict
            The parsed repository JSON.
        """
        logger = get_classMethod_logger(self, "getRepository")
        logger.info(f"Trying to find repository {repositoryName} in project {self.DEFAULTPROJECT}")
        repo = self.getDataSourceData(datasourceName=repositoryName)

        return loadJSON(repo)

    def loadAllDatasourcesInAllRepositoriesToProject(self, projectName, overwrite=False):
        """
        Load all data sources from all registered repositories into a project.

        Parameters
        ----------
        projectName : str
            The target project name.
        overwrite : bool
            If True, overwrite existing data sources.
        """
        logger = get_classMethod_logger(self, "loadAllDatasourcesInAllRepositoriesToProject")
        for repository in self.getDataSourceList():
            try:
                logger.info(f"Loading the repository {repository} to project {projectName}")
                self.loadAllDatasourcesInRepositoryToProject(projectName, repositoryName=repository,
                                                             overwrite=overwrite)
            except ValueError as e:
                logger.info(
                    f"Did not loaded repository: {repository}, since an error occured when tried to load it.\n The error message: {e}")

    def loadAllDatasourcesInRepositoryToProject(self, projectName, repositoryName, overwrite=False):
        """
        Load all data sources from a specific repository into a project.

        Parameters
        ----------
        projectName : str
            The target project name.
        repositoryName : str
            The name of the registered repository to load from.
        overwrite : bool
            If True, overwrite existing data sources.
        """
        logger = get_classMethod_logger(self, "loadAllDatasourcesInRepositoryToProject")
        logger.info(f"Loading repository {repositoryName}")
        repdoc = self.getDataSourceDocument(repositoryName)
        conf = repdoc.getData()
        logger.info(f"Data: {conf}")
        basedir = os.path.dirname(repdoc.resource)
        logger.info(f"basedir: {basedir}")
        logger.info(f"Loading the items in {repositoryName} repository to the {projectName}")
        self.loadAllDatasourcesInRepositoryJSONToProject(projectName=projectName,
                                                         repositoryJSON=conf,
                                                         basedir=basedir,
                                                         overwrite=overwrite)

    # hera/utils/data/toolkit.py  (inside class dataToolkit)
    # -----------------------------------------------------------------------------
    # Load all datasources from a repository JSON into a project.
    # If a toolkit is missing, try to auto-register it using classpath hints.
    # -----------------------------------------------------------------------------
    def getToolkitDocument(self, toolkit_name: str):
        """
        Find a dynamic toolkit document by name (either desc.datasourceName or desc.toolkit).
        Returns the mongoengine document or None.
        """
        # First: direct filter on datasourceName (works on most implementations)
        try:
            q = self.getMeasurementsDocuments(
                type="ToolkitDataSource", datasourceName=toolkit_name
            )
            if q and len(q) > 0:
                return q[0]
        except Exception:
            # fall through to broader search below
            pass

        # Second: scan all ToolkitDataSource docs and match by desc fields
        try:
            q = self.getMeasurementsDocuments(type="ToolkitDataSource")
            for d in q:
                desc = d.desc or {}
                if desc.get("datasourceName") == toolkit_name or desc.get("toolkit") == toolkit_name:
                    return d
        except Exception:
            pass

        # Optional: also look in DataSource collection if your project uses it
        try:
            q = self.getDataSourceDocuments(datasourceName=toolkit_name)
            if q and len(q) > 0:
                return q[0]
        except Exception:
            pass

        return None


    def loadAllDatasourcesInRepositoryJSONToProject(self,
                                                    projectName: str,
                                                    repositoryJSON: dict,
                                                    basedir: str = "",
                                                    overwrite: bool = False,
                                                    auto_register_missing: bool = True):
        """
        Iterate through the repository JSON and for each toolkit:
        - Try to get an instance via ToolkitHome.getToolkit.
        - If missing and auto_register_missing=True, attempt auto-register ONLY if there is
          a clear classpath hint in the JSON (Registry.classpath or Registry.cls).
        - After we have a valid instance, dispatch to the appropriate handler per section.
        """
        logger = get_classMethod_logger(self, "loadAllDatasourcesInRepositoryJSONToProject")
        if isinstance(repositoryJSON, str):
            if  repositoryJSON.startswith('/'): # if there is no data
                logger.info("skipping dynamic toolkit")
                return
            try:
                repositoryJSON = json.loads(repositoryJSON)
            except json.JSONDecodeError:
                logger.error("repositoryJSON is a string but not a valid JSON format.")
                return
        if not isinstance(repositoryJSON, dict):
            logger.warning(f"Expected dict for repositoryJSON, got {type(repositoryJSON)}. Skipping.")
            return
        if not repositoryJSON:
            logger.info("repositoryJSON is empty. Nothing to load.")
            return
        handlerDict = dict(
            Config=self._handle_Config,
            Datasource=self._handle_DataSource,
            Measurements=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
                toolkit, itemName, docTypeDict, overwrite, "Measurements", basedir
            ),
            Simulations=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
                toolkit, itemName, docTypeDict, overwrite, "Simulations", basedir
            ),
            Cache=lambda toolkit, itemName, itemDesc, overwrite, basedir: self._DocumentHandler(
                toolkit, itemName, itemDesc, overwrite, "Cache", basedir
            ),
            Function=self._handle_Function,
        )

        tk_home = ToolkitHome(projectName=projectName)

        for toolkitName, toolkitDict in (repositoryJSON or {}).items():
            # 1) Try static/dynamic resolution via ToolkitHome.getToolkit
            try:
                toolkit = tk_home.getToolkit(toolkitName=toolkitName)

            except Exception as e:
                logger.info(f"Toolkit '{toolkitName}' not found via getToolkit: {e}")
                toolkit = None



            # 3) If we still do not have a toolkit instance, skip this key quietly
            if toolkit is None:
                logger.info(
                    f"Skipping key '{toolkitName}' in repository JSON – "
                    f"no matching toolkit and no auto-registration performed."
                )
                continue

            # 4) Dispatch sections (Config, Datasource, Measurements, Simulations, Cache, Function)
            for key, docTypeDict in toolkitDict.items():
                logger.info(f"Loading document type {key} to toolkit {toolkitName}")
                handler = handlerDict.get(key.title(), None)

                if handler is None:
                    err = (
                        f"Unkonw Handler {key.title()}. "
                        f"The handler must be {', '.join(handlerDict.keys())}. "
                    )
                    logger.error(err)
                    raise ValueError(err)

                try:
                    handler(
                        toolkit=toolkit,
                        itemName=key,
                        docTypeDict=docTypeDict,
                        overwrite=overwrite,
                        basedir=basedir,
                    )
                except Exception as e:
                    err = (
                        f"The error {e} occured while adding *{key}* to toolkit {toolkitName}... skipping!!!"
                    )
                    logger.error(err)


    def _handle_Config(self, toolkit, itemName, docTypeDict, overwrite, basedir):
        """
        Handle a Config section from a repository JSON by calling ``toolkit.setConfig``.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance to configure.
        itemName : str
            The section name (unused, always 'Config').
        docTypeDict : dict
            Key-value pairs to set as configuration.
        overwrite : bool
            Whether to overwrite existing values.
        basedir : str
            Base directory for resolving relative paths (unused for Config).
        """
        toolkit.setConfig(**docTypeDict)

    def _handle_DataSource(self, toolkit, itemName, docTypeDict, overwrite, basedir):
        """
        Handle a DataSource section from a repository JSON by adding data sources to the toolkit.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance to add data sources to.
        itemName : str
            The section name.
        docTypeDict : dict
            Dictionary mapping data source names to their descriptions.
        overwrite : bool
            If True, overwrite existing data sources.
        basedir : str
            Base directory for resolving relative resource paths.
        """
        logger = get_classMethod_logger(self, "_handle_DataSource")

        for itemName, itemDesc in docTypeDict.items():
            theItem = itemDesc["item"]

            isRelativePath = itemDesc.get("isRelativePath")
            assert (isRelativePath=='True' or isRelativePath=='False') or isinstance(isRelativePath,bool), "isRelativePath must be defined as 'True' or 'False'. "


            if 'resource' in theItem and "resourceFilePath" in theItem:
                logger.warning(f"both resource and resourceFilePath are defined for datasource {itemName}, using just resource")
                theItem.pop("resourceFilePath")

            if 'resource' not in theItem and "resourceFilePath" in theItem:
                if isRelativePath=='True' or isRelativePath is True:
                    logger.debug(
                        f"The input is not absolute (it is relative). Adding the path {basedir} to the resource {theItem['resourceFilePath']}")
                    theItem["resourceFilePath"] = os.path.join(basedir, theItem["resourceFilePath"])

                logger.info("detected dataSource resource specified using file's contents")
                try:
                    with open(theItem.pop("resourceFilePath")) as dataSourceResourceFile:
                        theItem['resource'] = json.load(dataSourceResourceFile)
                        logger.info("extracted resource from file successfully")
                except Exception as e:
                    logger.error(f"failed reading resource from file, {e}")
            else:
                # logger.debug(f"Checking if {itemName} resource is a path {isRelativePath}, is it absolute? {isAbsolute}")
                if isRelativePath=='True' or isRelativePath is True:
                    logger.debug(
                        f"The input is not absolute (it is relative). Adding the path {basedir} to the resource {theItem['resource']}")
                    theItem["resource"] = os.path.join(basedir, theItem["resource"])




            logger.debug(f"Checking if the data item {itemName} is already in project {toolkit.projectName}")
            datasource = toolkit.getDataSourceDocuments(datasourceName=itemName)
            if len(datasource) == 0 or overwrite:

                if len(datasource) == 1:
                    logger.debug("Remove the old datasource")
                    toolkit.deleteDataSource(datasourceName=itemName)

                logger.debug("Adding a new datasource")
                theItem['dataSourceName'] = itemName
                theItem['overwrite'] = overwrite
                toolkit.addDataSource(**theItem)
                logger.info(f"Added source {itemName} to tool {toolkit.toolkitName} in project {toolkit.projectName}")
            else:
                logger.error(f"Source {itemName} already exists in {toolkit.projectName}. Use --overwrite to force update")

    def _DocumentHandler(self, toolkit, itemName, docTypeDict, overwrite, documentType, basedir):
        """
        Handle a Measurements, Simulations, or Cache section from a repository JSON.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance to add documents to.
        itemName : str
            The section name.
        docTypeDict : dict
            Dictionary mapping document names to their descriptions.
        overwrite : bool
            If True, overwrite existing documents.
        documentType : str
            One of 'Measurements', 'Simulations', or 'Cache'.
        basedir : str
            Base directory for resolving relative resource paths.
        """
        logger = get_classMethod_logger(self, "_handle_Document")
        logger.info(f"Loading {itemName} to toolkit {toolkit.toolkitName} (ProjectName {toolkit.projectName}")
        for itemName, itemDesc in docTypeDict.items():
            theItem = itemDesc["item"]
            theItem["resource"] = self._makeItemPathAbsolute(theItem,basedir)

            logger.debug(f"Checking if the data item {itemName} is already in the project")
            retrieveFuncName = f"get{documentType}Documents"
            retrieveFunc = getattr(toolkit, retrieveFuncName)
            if retrieveFunc is None:
                raise ValueError(
                    f"function {retrieveFuncName} not found. Key {documentType} must be : DataSource, Measurement, Cache, or Simulation")
            qrydict = dict(theItem)
            del qrydict['resource']
            del qrydict['dataFormat']
            itemQry = dictToMongoQuery(qrydict)
            datasource = retrieveFunc(**itemQry)
            logger.debug(f"Found {len(datasource)} documents")

            if len(datasource) == 0:
                funcName = f"add{documentType}Document"

                logger.debug(f"Adding the document of type {documentType} using the function {funcName}")
                func = getattr(toolkit, funcName)

                func(**theItem)
                logger.info(f"Added source {itemName} to tool {toolkit.toolkitName} in project {toolkit.projectName}")

            elif overwrite:
                logger.debug("Updating an existing document")
                dataitem = datasource[0]
                dataitem['resource'] = theItem["resource"]
                dataitem['dataFormat'] = theItem['dataFormat']
                curDesc = theItem.get("desc", {})
                curDesc.update(dataitem['desc'])
                dataitem['desc'] = curDesc
                dataitem.save()
                logger.info(f"Updated source {itemName} in tool {toolkit.toolkitName} in project {toolkit.projectName}")
            else:
                logger.error(
                    f"Source {itemName} already exists in {toolkit.projectName}. Use --overwrite to force update")

    def _handle_Function(self, toolkit, itemName, docTypeDict, overwrite, basedir):
        """
        Handle a Function section by calling named methods on the toolkit.

        Each key in ``docTypeDict`` is a method name on ``self``. The value can be:
        - A dict: passed as keyword arguments to a single call.
        - A list of dicts: each dict triggers a separate call.

        The called method must accept an ``overwrite`` keyword argument.

        Parameters
        ----------
        toolkit : abstractToolkit
            The toolkit instance (unused directly; methods are called on ``self``).
        itemName : str
            The section name.
        docTypeDict : dict
            Maps method names to their argument(s).
        overwrite : bool
            Passed to each method call.
        basedir : str
            Base directory (unused for Function).
        """
        logger = get_classMethod_logger(self, "_handle_GeneralFunction")
        for itemName, itemDesc in docTypeDict.items():
            retrieveFunc = getattr(self,itemName)

            if isinstance(itemDesc,dict):
                retrieveFunc(**itemDesc,overwrite=overwrite)
            elif isinstance(itemDesc,list):
                for imt in itemDesc:
                    if isinstance(imt,dict):
                        retrieveFunc(**imt, overwrite=overwrite)
                    else:
                        err = f"{itemName} has a non dict item in the list : {imt}... ignoring."
                        logger.error(err)
            else:
                err = f"{itemName} value must be dict of a list of dicts. "
                logger.error(err)
                raise ValueError(err)


    def _makeItemPathAbsolute(self, theItem, basedir):
        """
        Convert a resource path to absolute if the ``isRelativePath`` flag is set.

        Parameters
        ----------
        theItem : dict
            The item data containing ``resource`` and optionally ``isRelativePath``.
        basedir : str
            Base directory to resolve relative paths against.

        Returns
        -------
        str
            The absolute resource path.
        """
        logger = get_classMethod_logger(self, "_makeItemPathAbsolute")
        isRelativePath = bool(theItem.get("isRelativePath", True))
        # logger.debug(f"Checking if {itemName} resource is a path {isRelativePath}, is it absolute? {isAbsolute}")

        if isRelativePath:
            logger.debug(
                f"The input is not absolute (it is relative). Adding the path {basedir} to the resource {theItem['resource']}")

        return os.path.join(basedir, theItem["resource"]) if isRelativePath else theItem["resource"]

    # -------------------------------------------------------------------------
    # Direct-load helpers (no MongoDB round-trip required)
    # -------------------------------------------------------------------------

    @staticmethod
    def resolveDataSourcePaths(repositoryJSON, basedir=""):
        """
        Walk a repository JSON dict and resolve every ``resource`` field to an
        absolute path, respecting the ``isRelativePath`` flag on each entry.

        Parameters
        ----------
        repositoryJSON : dict
            The parsed repository JSON (toolkit-name -> section dict).
        basedir : str
            The base directory against which relative paths are resolved.
            Typically the directory that contains the repository JSON file.

        Returns
        -------
        dict
            A *deep copy* of ``repositoryJSON`` with all ``resource`` fields
            converted to absolute paths.
        """
        import copy
        resolved = copy.deepcopy(repositoryJSON)

        for _toolkitName, toolkitDict in resolved.items():
            if not isinstance(toolkitDict, dict):
                continue
            for sectionKey, sectionDict in toolkitDict.items():
                if not isinstance(sectionDict, dict):
                    continue
                for itemName, itemDesc in sectionDict.items():
                    if not isinstance(itemDesc, dict):
                        continue
                    # Handle entries that have an "item" wrapper
                    item = itemDesc.get("item", itemDesc)
                    if "resource" not in item:
                        continue
                    is_rel = itemDesc.get("isRelativePath", item.get("isRelativePath"))
                    if is_rel == "True" or is_rel is True:
                        item["resource"] = os.path.abspath(
                            os.path.join(basedir, item["resource"])
                        )
        return resolved

    @staticmethod
    def loadRepositoryFromPath(json_path):
        """
        Read a repository JSON file directly from disk, resolve all relative
        ``resource`` paths to absolute paths based on the JSON file's directory,
        and return the resulting dict.

        This allows tests (and lightweight scripts) to work with repository
        data without going through ``addRepository`` + MongoDB storage.

        Parameters
        ----------
        json_path : str
            Path to the repository JSON file.

        Returns
        -------
        dict
            The repository dict with all resource paths resolved to absolute.

        Raises
        ------
        FileNotFoundError
            If *json_path* does not exist.
        """
        json_path = os.path.abspath(json_path)
        if not os.path.isfile(json_path):
            raise FileNotFoundError(f"Repository JSON not found: {json_path}")

        with open(json_path, "r", encoding="utf-8") as fh:
            repo_json = json.load(fh)

        basedir = os.path.dirname(json_path)
        return dataToolkit.resolveDataSourcePaths(repo_json, basedir=basedir)

addRepository(repositoryName, repositoryPath, overwrite=False)

Register a repository JSON file as a data source.

Parameters:

Name Type Description Default
repositoryName str

The name to register the repository under.

required
repositoryPath str

Path to the repository JSON file. .json extension is appended if missing.

required
overwrite bool

If True, overwrite an existing repository with the same name.

False
Source code in hera/utils/data/toolkit.py
def addRepository(self, repositoryName, repositoryPath, overwrite=False):
    """
    Register a repository JSON file as a data source.

    Parameters
    ----------
    repositoryName : str
        The name to register the repository under.
    repositoryPath : str
        Path to the repository JSON file. ``.json`` extension is appended if missing.
    overwrite : bool
        If True, overwrite an existing repository with the same name.
    """
    self._allowWritingToDefaultProject = True  # allows the addition of datasource to the Default project.

    repositoryPath = f"{repositoryPath}.json" if "json" not in repositoryPath else repositoryPath
    self.addDataSource(dataSourceName=repositoryName, resource=os.path.abspath(repositoryPath),
                       dataFormat=self.datatypes.JSON_DICT, overwrite=overwrite)
    self._allowWritingToDefaultProject = False

getRepository(repositoryName)

Load and return a repository's JSON content by name.

Parameters:

Name Type Description Default
repositoryName str

The name of the registered repository.

required

Returns:

Type Description
dict

The parsed repository JSON.

Source code in hera/utils/data/toolkit.py
def getRepository(self, repositoryName):
    """
    Load and return a repository's JSON content by name.

    Parameters
    ----------
    repositoryName : str
        The name of the registered repository.

    Returns
    -------
    dict
        The parsed repository JSON.
    """
    logger = get_classMethod_logger(self, "getRepository")
    logger.info(f"Trying to find repository {repositoryName} in project {self.DEFAULTPROJECT}")
    repo = self.getDataSourceData(datasourceName=repositoryName)

    return loadJSON(repo)

loadAllDatasourcesInRepositoryJSONToProject(projectName: str, repositoryJSON: dict, basedir: str = '', overwrite: bool = False, auto_register_missing: bool = True)

Iterate through the repository JSON and for each toolkit: - Try to get an instance via ToolkitHome.getToolkit. - If missing and auto_register_missing=True, attempt auto-register ONLY if there is a clear classpath hint in the JSON (Registry.classpath or Registry.cls). - After we have a valid instance, dispatch to the appropriate handler per section.

Source code in hera/utils/data/toolkit.py
def loadAllDatasourcesInRepositoryJSONToProject(self,
                                                projectName: str,
                                                repositoryJSON: dict,
                                                basedir: str = "",
                                                overwrite: bool = False,
                                                auto_register_missing: bool = True):
    """
    Iterate through the repository JSON and for each toolkit:
    - Try to get an instance via ToolkitHome.getToolkit.
    - If missing and auto_register_missing=True, attempt auto-register ONLY if there is
      a clear classpath hint in the JSON (Registry.classpath or Registry.cls).
    - After we have a valid instance, dispatch to the appropriate handler per section.
    """
    logger = get_classMethod_logger(self, "loadAllDatasourcesInRepositoryJSONToProject")
    if isinstance(repositoryJSON, str):
        if  repositoryJSON.startswith('/'): # if there is no data
            logger.info("skipping dynamic toolkit")
            return
        try:
            repositoryJSON = json.loads(repositoryJSON)
        except json.JSONDecodeError:
            logger.error("repositoryJSON is a string but not a valid JSON format.")
            return
    if not isinstance(repositoryJSON, dict):
        logger.warning(f"Expected dict for repositoryJSON, got {type(repositoryJSON)}. Skipping.")
        return
    if not repositoryJSON:
        logger.info("repositoryJSON is empty. Nothing to load.")
        return
    handlerDict = dict(
        Config=self._handle_Config,
        Datasource=self._handle_DataSource,
        Measurements=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
            toolkit, itemName, docTypeDict, overwrite, "Measurements", basedir
        ),
        Simulations=lambda toolkit, itemName, docTypeDict, overwrite, basedir: self._DocumentHandler(
            toolkit, itemName, docTypeDict, overwrite, "Simulations", basedir
        ),
        Cache=lambda toolkit, itemName, itemDesc, overwrite, basedir: self._DocumentHandler(
            toolkit, itemName, itemDesc, overwrite, "Cache", basedir
        ),
        Function=self._handle_Function,
    )

    tk_home = ToolkitHome(projectName=projectName)

    for toolkitName, toolkitDict in (repositoryJSON or {}).items():
        # 1) Try static/dynamic resolution via ToolkitHome.getToolkit
        try:
            toolkit = tk_home.getToolkit(toolkitName=toolkitName)

        except Exception as e:
            logger.info(f"Toolkit '{toolkitName}' not found via getToolkit: {e}")
            toolkit = None



        # 3) If we still do not have a toolkit instance, skip this key quietly
        if toolkit is None:
            logger.info(
                f"Skipping key '{toolkitName}' in repository JSON – "
                f"no matching toolkit and no auto-registration performed."
            )
            continue

        # 4) Dispatch sections (Config, Datasource, Measurements, Simulations, Cache, Function)
        for key, docTypeDict in toolkitDict.items():
            logger.info(f"Loading document type {key} to toolkit {toolkitName}")
            handler = handlerDict.get(key.title(), None)

            if handler is None:
                err = (
                    f"Unkonw Handler {key.title()}. "
                    f"The handler must be {', '.join(handlerDict.keys())}. "
                )
                logger.error(err)
                raise ValueError(err)

            try:
                handler(
                    toolkit=toolkit,
                    itemName=key,
                    docTypeDict=docTypeDict,
                    overwrite=overwrite,
                    basedir=basedir,
                )
            except Exception as e:
                err = (
                    f"The error {e} occured while adding *{key}* to toolkit {toolkitName}... skipping!!!"
                )
                logger.error(err)

resolveDataSourcePaths(repositoryJSON, basedir='') staticmethod

Walk a repository JSON dict and resolve every resource field to an absolute path, respecting the isRelativePath flag on each entry.

Parameters:

Name Type Description Default
repositoryJSON dict

The parsed repository JSON (toolkit-name -> section dict).

required
basedir str

The base directory against which relative paths are resolved. Typically the directory that contains the repository JSON file.

''

Returns:

Type Description
dict

A deep copy of repositoryJSON with all resource fields converted to absolute paths.

Source code in hera/utils/data/toolkit.py
@staticmethod
def resolveDataSourcePaths(repositoryJSON, basedir=""):
    """
    Walk a repository JSON dict and resolve every ``resource`` field to an
    absolute path, respecting the ``isRelativePath`` flag on each entry.

    Parameters
    ----------
    repositoryJSON : dict
        The parsed repository JSON (toolkit-name -> section dict).
    basedir : str
        The base directory against which relative paths are resolved.
        Typically the directory that contains the repository JSON file.

    Returns
    -------
    dict
        A *deep copy* of ``repositoryJSON`` with all ``resource`` fields
        converted to absolute paths.
    """
    import copy
    resolved = copy.deepcopy(repositoryJSON)

    for _toolkitName, toolkitDict in resolved.items():
        if not isinstance(toolkitDict, dict):
            continue
        for sectionKey, sectionDict in toolkitDict.items():
            if not isinstance(sectionDict, dict):
                continue
            for itemName, itemDesc in sectionDict.items():
                if not isinstance(itemDesc, dict):
                    continue
                # Handle entries that have an "item" wrapper
                item = itemDesc.get("item", itemDesc)
                if "resource" not in item:
                    continue
                is_rel = itemDesc.get("isRelativePath", item.get("isRelativePath"))
                if is_rel == "True" or is_rel is True:
                    item["resource"] = os.path.abspath(
                        os.path.join(basedir, item["resource"])
                    )
    return resolved

loadRepositoryFromPath(json_path) staticmethod

Read a repository JSON file directly from disk, resolve all relative resource paths to absolute paths based on the JSON file's directory, and return the resulting dict.

This allows tests (and lightweight scripts) to work with repository data without going through addRepository + MongoDB storage.

Parameters:

Name Type Description Default
json_path str

Path to the repository JSON file.

required

Returns:

Type Description
dict

The repository dict with all resource paths resolved to absolute.

Raises:

Type Description
FileNotFoundError

If json_path does not exist.

Source code in hera/utils/data/toolkit.py
@staticmethod
def loadRepositoryFromPath(json_path):
    """
    Read a repository JSON file directly from disk, resolve all relative
    ``resource`` paths to absolute paths based on the JSON file's directory,
    and return the resulting dict.

    This allows tests (and lightweight scripts) to work with repository
    data without going through ``addRepository`` + MongoDB storage.

    Parameters
    ----------
    json_path : str
        Path to the repository JSON file.

    Returns
    -------
    dict
        The repository dict with all resource paths resolved to absolute.

    Raises
    ------
    FileNotFoundError
        If *json_path* does not exist.
    """
    json_path = os.path.abspath(json_path)
    if not os.path.isfile(json_path):
        raise FileNotFoundError(f"Repository JSON not found: {json_path}")

    with open(json_path, "r", encoding="utf-8") as fh:
        repo_json = json.load(fh)

    basedir = os.path.dirname(json_path)
    return dataToolkit.resolveDataSourcePaths(repo_json, basedir=basedir)