Skip to content

Simulations Toolkits API

Toolkits for running and managing computational simulations.

OpenFOAM

OFToolkit

hera.simulations.openFoam.toolkit.OFToolkit

Bases: hermesWorkflowToolkit

The goal of this toolkit is to provide the functions that are required to run workflows. and to mange the workflows in the DB.

This toolkit might relay on the hermes project in order to manipulate the nodes of the workflow. (TBD).

Source code in hera/simulations/openFoam/toolkit.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
class OFToolkit(hermesWorkflowToolkit):
    """
        The goal of this toolkit is to provide the functions that are required to run workflows.
        and to mange the workflows in the DB.

        This toolkit might relay on the hermes project in order to manipulate the nodes
        of the workflow. (TBD).

    """
    TIME_STEADYSTATE = "steadyState"
    TIME_DYNAMIC = "dynamic"

    FLOWTYPE_COMPRESSIBLE = FLOWTYPE_COMPRESSIBLE
    FLOWTYPE_INCOMPRESSIBLE = FLOWTYPE_INCOMPRESSIBLE
    FLOWTYPE_DISPERSION = FLOWTYPE_DISPERSION

    FIELDTYPE_SCALAR = FIELDTYPE_SCALAR
    FIELDTYPE_VECTOR = FIELDTYPE_VECTOR
    FIELDTYPE_TENSOR = FIELDTYPE_TENSOR

    CASETYPE_DECOMPOSED = CASETYPE_DECOMPOSED
    CASETYPE_RECONSTRUCTED = CASETYPE_RECONSTRUCTED

    DOCTYPE_OF_FLOWDISPERSION = "flowDispersion"

    stochasticLagrangian = None

    buoyantReactingFoam = None

    def __init__(self, projectName, filesDirectory=None, connectionName=None):
        """Initialize the OpenFOAM toolkit with solver extensions and object home."""
        super().__init__(projectName=projectName,
                         filesDirectory=filesDirectory,
                         toolkitName="OFworkflowToolkit", 
                         connectionName=connectionName)

        self.OFObjectHome = OFObjectHome()
        self._analysis = Analysis(self)
        self._presentation = Presentation(self, self.analysis)
        self.stochasticLagrangian = StochasticLagrangianSolver_toolkitExtension(self)
        self.buoyantReactingFoam  = buoyantReactingFoam_toolkitExtension(self)


    def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource):
        """
            Build the workflow and then runs the simulation.

        Parameters
        ----------
        nameOrWorkflowFileOrJSONOrResource

        Returns
        -------

        """
        logger = get_classMethod_logger(self,"runOFSimulation")
        logger.info("Building the case")
        self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource)

        logger.info("Executing the cases")
        docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)
        for doc in docList:
            logger.info(f"Executing {doc.desc['workflowName']}")
            os.chdir(doc.resource)
            os.system("./Allrun")

    def prepareSlurmWorkflowExecution(self,baseConfiguration,
                              jsonVariations,
                              slurmExecutionFileName="submit_all.sh",
                              caseListFileName="cases.txt",
                              allocateProcessorsPerRun=None,
                              memoryInGB=None,
                              jobName="foam_cases",
                              exclusive=False,
                              addAllRun=True):
        """
            Adds the different configurations to the workgroup,

        Parameters
        ----------
        baseConfiguration : dict
                basic hermes workflow to run
        jsonVariations :
                Variation file (using the jsonutils variations) format.
        slurmExecutionFileName: str
                The name of the bash file to create with the slurm batch run
        caseListFileName:
                The batchfile uses case file name, so add it.
        allocateProcessorsPerRun : int | None
                How many nodes(currently just threads) are used per job, in slurm documentation they describe "This option advises ... that job steps run ... will launch a maximum of number tasks and to provide for sufficient resources."
        memoryInGB : str | int | none
                Should memory be limited, affect depends on configuration(might kill if memory use is exceeded)
        jobName : bool
                name for slurm job
        exclusive : bool
                Should slurm run one job at a time on a GRES(Generic RESource in our case CPUs)
        addAllRun: bool
                Should slurm do Allrun
        Returns
        -------

        """

        logger = get_classMethod_logger(self,"prepareSlurmWorkflowExecution")
        caseList = ""
        if isinstance(baseConfiguration,str):
            logger.info(f"Assuming {baseConfiguration} is workflow name")
            workflow=self.getWorkflowDocumentByName(baseConfiguration)
            baseConfiguration = workflow['desc']['workflow']
        elif not isinstance(baseConfiguration, dict):
            logger.error("Slurm preparation can only handle base workflow contents or workflow name")

        if isinstance(jsonVariations, str):
            logger.info(f"Assuming {jsonVariations} is path to variations file")
            with open(jsonVariations, 'r') as variationsFile:
                jsonVariations = json.load(variationsFile)
        elif not isinstance(jsonVariations, dict):
            logger.error("Slurm preparation only supports json variation input as path or dict")


        for jsonConfig in JSONVariations(baseConfiguration, jsonVariations):
            doc = self.addWorkflowToGroup(workflowJSON=jsonConfig,
                                          groupName=workflow['desc']['groupName'],
                                          writeWorkflowToFile=True)
            caseList += f"{doc.desc['workflowName']}\n"
            logger.info(f"Adding {doc.desc['workflowName']}")

        allRunPart = """cd "$dir" || {{ echo "Directory $dir not found"; exit 1; }}

# Run the Allrun script
bash Allrun"""

        script =f"""
hera-workflows sync --force "$dir"; hera-workflows buildExecute "$dir"

{allRunPart if addAllRun else ""}
        """
        caseListFilePath = os.path.join(self.filesDirectory, caseListFileName)

        logger.info(f"Writing case list file for group {workflow['desc']['groupName']}")

        with open(caseListFilePath,"w") as outputFile:
            outputFile.write(caseList)
        slurm.prepareSlurmScriptExecution(script=script,
                                          slurmExecutionFilePath=os.path.join(self.filesDirectory,slurmExecutionFileName),
                                          jobDirListFilePath=caseListFilePath,
                                          allocateProcessorsPerRun=allocateProcessorsPerRun,
                                          memoryInGB=memoryInGB,
                                          jobName=jobName,
                                          quiet=False,
                                          exclusive=exclusive)



    def processorList(self, caseDirectory):
        """
            Returns the list of processors directories in the case
        Parameters
        ----------
        caseDirectory : str
            Path to the directory.

        Returns
        -------

        """
        return [os.path.basename(proc) for proc in glob.glob(os.path.join(caseDirectory, "processor*"))]

    def getHermesWorkflow_Flow(self, workflowfile):
        """
            Returns the workflow of the requested JSON file.
        Parameters
        ----------
        workflowfile

        Returns
        -------

        """
        return workflow_Eulerian(workflowfile)

    def getMeshFromName(self,nameOrWorkflowFileOrJSONOrResource,readParallel=True, time=0):
        """
            Returns the name from the workflow
        Parameters
        ----------
        nameOrWorkflowFileOrJSONOrResource : string or dict
        The name/dict that defines the item

        readParallel: bool
                If parallel case exists, read it .

        time : float
            The time to read the mesh from. (relevant for mesh moving cases).

        Returns
        -------

        """
        docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)
        if len(docList)==0:
            return None
        else:
            doc = docList[0]

        return self.getMesh(doc.getData())

    def getMesh(self, caseDirectory, readParallel=True, time=0):
        """
            Reads the mesh from the mesh directory.

            Reads the decomposed case if it exists and parallel is true,
            otherwise, reads just the single case.

            Unfortunately, we have to use the OF postProcess utility in order to interpolate the
            mesh points to their centers.

        Parameters
        ----------
            caseDirectory: str
                    The path to the case. Should be absolute in order to determine whether we need to add the -case tot he postProcess.

            readParallel: bool
                    If parallel case exists, read it .

            time : float
                The time to read the mesh from. (relevant for mesh moving cases).

        Returns
        -------
            pandas dataframe with the points in the columns             x,y,z
            the index column (don't mix up with the index of pandas)  is the sequential number of the point.

            If the case is decomposed, return processorNumber and index columns.
            The index is the internal order in the processor.
        """

        # 1. Run the postProcess utility to set the cell centers
        logger = get_classMethod_logger(self, "getMesh")
        logger.info(f"Start. case {caseDirectory}. Current directory is : {os.getcwd()}.")

        casePointer = "" if caseDirectory == os.getcwd() else f"-case {caseDirectory}"

        useParallel = False
        if readParallel:
            logger.debug(f"Attempt to load parallel case")
            # Check if the case is decomposed, if it is, run it.
            proc0dir = os.path.join(caseDirectory, "processor0")

            if os.path.exists(proc0dir):
                logger.debug(f"Found parallel case, using decomposed case")
                useParallel = True
            else:
                logger.debug(f"parallel case NOT found. Using composed case")

        # Calculating the cell centers
        checkPath = os.path.join(caseDirectory, "processor0", str(time), "C") if useParallel else os.path.join(
            caseDirectory, str(time), "C")
        parallelExec = "-parallel" if useParallel else ""
        caseType = "decomposed" if useParallel else "composed"
        if not os.path.exists(checkPath):
            logger.debug(f"Cell centers does not exist in {caseType} case. Calculating...")
            os.system(f"foamJob {parallelExec} {casePointer} -wait postProcess -func writeCellCentres  -time {time}")
            logger.debug(f"done: foamJob {parallelExec} -wait postProcess -func writeCellCentres {casePointer} -time {time}")
            if not os.path.exists(checkPath):
                logger.error("Error running the writeCellCentres. Executing writeCellCentres failed. Are you sure that the openFOAM environment is set?"\
                             "try to load the enviroment and then rerun (in jupyter, you need to restart the server)")

                raise RuntimeError("Error running the writeCellCentres. Check mesh")
        else:
            logger.debug(f"Cell centers exist in {caseType} case.")

        logger.debug(f"Loading the cell centers in time {time}. Using {caseType}")
        cellCenters = self.OFObjectHome.readFieldFromCase(fieldName="cellCenters",
                                                         flowType=FLOWTYPE_INCOMPRESSIBLE,
                                                         caseDirectory=caseDirectory,
                                                         timeStep=time,
                                                         readParallel=readParallel)
        return cellCenters

    def getMeshExtentFromName(self,nameOrWorkflowFileOrJSONOrResource,readParallel=True, time=0):
        """
            Returns the name from the workflow
        Parameters
        ----------
        nameOrWorkflowFileOrJSONOrResource : string or dict
        The name/dict that defines the item

        readParallel: bool
                If parallel case exists, read it .

        time : float
            The time to read the mesh from. (relevant for mesh moving cases).

        Returns
        -------

        """
        docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)
        if len(docList)==0:
            return None
        else:
            doc = docList[0]

        return self.getMeshExtent(doc.getData())

    def read_points_file(self,path):
        """Parse an OpenFOAM points file and return coordinates as a numpy array."""
        pts = []
        with open(path) as f:
            lines = f.readlines()

        # find the line containing only the number (e.g., "1606203")
        idx = next(i for i,l in enumerate(lines) if l.strip().isdigit())

        # points start 2 lines after that:
        start = idx + 2

        for line in lines[start:]:
            line = line.strip()
            if line == ")":      # end of list
                break
            if line.startswith("(") and line.endswith(")"):
                x, y, z = line[1:-1].split()
                pts.append([float(x), float(y), float(z)])
        return numpy.array(pts)


    def getMeshExtent(self,caseDirectory):
        """Return the bounding box of the mesh from the points file."""
        points_path = os.path.join(caseDirectory,"constant","polyMesh","points")
        if not os.path.exists(points_path):
            raise FileNotFoundError(f"File not found: {points_path}")

        # Parse the blockMeshDict
        pts = self.read_points_file(points_path)

        # Compute the coordinate bounds
        xmin, ymin, zmin = pts.min(axis=0)
        xmax, ymax, zmax = pts.max(axis=0)

        bounds = {
            "x": (xmin, xmax),
            "y": (ymin, ymax),
            "z": (zmin, zmax)
        }
        return bounds

    def createEmptyCase(self, caseDirectory: str, fieldList: list, flowType: str, additionalFieldsDescription=dict()):
        """
            Creates an empty case directory for the simulation.
            fields is a list of fields to create in the case directory.

            The simulation type (copressible, incompressible, dispersion) is needed to get the dimensions and components
            of the fields. If the fields are not in the standard list then their description can be supplied in the
            additionalFieldsDescription parameters

        Parameters
        ----------
        caseDirectory : str
            The case directory to create

        fieldList : list
            The list of field names to create

        flowType : str
            compressible, incompressible or dispersion.
            The dimension of the fields is determined by the type of simulation

        additionalFieldsDescription : dict | str
            Definition of additional fields:
            has the structure :
            {
                dimensions : {kg : .., m : ..},
                componentNames : None|list
            )
            the keys for the dimensions are kg,m,s,K,mol,A,cd

            Can also be a JSON file name.

        Returns
        -------

        """
        logger = get_classMethod_logger(self,"createEmptyCase")
        logger.info(f"Making case {caseDirectory} with fields {','.join(fieldList)}")

        # Make the case :
        if os.path.isfile(caseDirectory):
            raise ValueError(
                f"The file {caseDirectory} exists as a file. Cannot create a directory. Please remove/rename it and rerun. ")

        os.makedirs(os.path.join(caseDirectory, "constant"), exist_ok=True)
        os.makedirs(os.path.join(caseDirectory, "system"), exist_ok=True)
        os.makedirs(os.path.join(caseDirectory, "constant", "triSurface"), exist_ok=True)
        os.makedirs(os.path.join(caseDirectory, "0"), exist_ok=True)
        os.makedirs(os.path.join(caseDirectory, "0.orig"), exist_ok=True)
        os.makedirs(os.path.join(caseDirectory, "0.parallel"), exist_ok=True)

        fileaddition = dict()
        if additionalFieldsDescription is not None:
            fileaddition = loadJSON(additionalFieldsDescription)

        for fieldName, fieldDefs in fileaddition.items():
            logger.info(f"Adding temporary field {fieldName} to the field directory")
            self.OFObjectHome.addFieldDefinitions(fieldName=fieldName, **fieldDefs)

        # Makes the empty fields
        for fieldName in fieldList:
            logger.info(f"Creating field {fieldName}")
            self.writeEmptyField(fieldName=fieldName,flowType=flowType,caseDirectory=caseDirectory,timeOrLocation=0)
            self.writeEmptyField(fieldName=fieldName,flowType=flowType,caseDirectory=caseDirectory,timeOrLocation="0.orig")
            self.writeEmptyField(fieldName=fieldName,flowType=flowType,caseDirectory=caseDirectory,timeOrLocation="0.parallel",writeProcBoundary=True)

    def writeEmptyField(self,fieldName,flowType,caseDirectory,timeOrLocation=0,readBoundaryFromCase=False,writeProcBoundary=False):
        """
            Writes an empty field in the case.

            If the readBoundaryField is True, then the field is written with the relevant boundaries (that are red from the case).

        Parameters
        ----------
        fieldName : str
            The name of the field
        flowType : str
            The flow type (compressible/incompressible)
        timeOrLocation : float  [default : 0]
            The time to write the new field in
        caseDirectory : str
            The name of te new case directory
        readParallel : bool
            If true, then attempt to write as parallel fields.
        readBoundaryFromCase : bool
            If True, tries to read the boundary names from the case.
            Otherwise write the general ".*" boundary field.

        writeProcBoundary : bool
           If true writes the proc boundaries.
           If readBoundaryFromCase is True, then write the specific proc for each processor (when it is parallel)
           Otherwise, write the "proc.*" boundary field.

        Returns
        -------

        """
        logger = get_classMethod_logger(self,"writeEmptyField")
        logger.info(f"Creating the field: {fieldName}. ")

        field = self.OFObjectHome.getEmptyField(fieldName=fieldName, flowType=flowType)

        if readBoundaryFromCase:
            field.readBoundariesFromCase(caseDirectory,readParallel=True)

        if writeProcBoundary:
            field.addProcBoundary()
        field.writeToCase(caseDirectory=caseDirectory, timeOrLocation=timeOrLocation)

    #############################################################

    def template_add(self, name, objFile, workflowObj=None):
        """
            Adds a templates to the toolkit.

            Templates can be
                - Flow : Holds Hermes flow templates.
                - Node : Holds a hermes node objects
                - Field : Holds a field templates.
                        This can be
                            * xarray
                            * pandas/dask
                            * constant

        Parameters
        ----------
        name
        objFile
        workflowObj

        Returns
        -------

        """
        pass

    def xarrayToSetFieldsDictDomain(self, xarrayData, xColumnName="x", yColumnName="y", zColumnName="z", time=None,
                                    timeColumn="time", **kwargs):
        """
            Converts the xarray to the setFields dict of the internal domain.

            Not debugged.

            Use

        Parameters
        ----------
        xarrayData : xarray dataset
                The data set as xarray.
        xColumnName : string
            The coordinate name of the x-axis
            If None, ignore that coordinate (the mesh should be in the right dimension).
        yColumnName: string
            The coordinate name of the y-axis
            If None, ignore that coordinate (the mesh should be  in the right dimension).
        zColumnName: string
            The coordinate name of the z-axis
            If None, ignore that coordinate (the mesh should be  in the right dimension).
        time : string
            The time is not None, select the closest (nearest neighbours).

            If None ignore the index.
        kwargs : dict
            A mapping of OpenFoamField -> dataset Field
            If the dataset Field is a tuple, then write it as vector.
            Each component can be either a field, float (to keep fixed value).
            If you want to map a function, just create its value as a Field name.

            so
                U = ("u","v",0)
                will map the feld 'u' as Ux, 'v' as Uy and Uz=0


        Returns
        -------
            string (setField dict).

        """
        logger = get_classMethod_logger(self, "xarrayToSetFieldsDict")
        logger.debug(f"------------ Start  : {logger.name} ")

        # Build the list of coordinate axes to iterate over.
        # For each active axis, store the number of cells (shape - 1, since
        # we iterate cell boundaries, not cell centers).
        coordList = []
        coordNames = []
        if xColumnName is not None:
            coordList.append(xarrayData.coords[xColumnName].shape[0] - 1)
            coordNames.append(xColumnName)
        if yColumnName is not None:
            coordList.append(xarrayData.coords[yColumnName].shape[0] - 1)
            coordNames.append(yColumnName)
        if zColumnName is not None:
            coordList.append(xarrayData.coords[zColumnName].shape[0] - 1)
            coordNames.append(zColumnName)

        ret = []
        arryToOFvector = lambda arry: f"({' '.join(arry)} )"

        # Iterate over all cells using Cartesian product of index ranges.
        # Each X is a tuple of indices (ix, iy, iz) defining one cell.
        # For each cell, compute the bounding box (lowerLeft, upperRight)
        # from the coordinate arrays, then extract field values at that cell.
        for X in product(*coordList):
            lowerLeft = [xarrayData.coords[coordNames[coordIndx]][ind].item() for coordIndx, ind in enumerate(X)]
            upperRight = [xarrayData.coords[coordNames[coordIndx]][ind + 1].item() for coordIndx, ind in enumerate(X)]

            fielaValueStr = ""
            for OFField, fieldName in kwargs.items():

                # Build the isel accessor dict for this cell (e.g. {"x": 3, "y": 5}).
                accessDict = dict([(coordname, indx) for (coordname, indx) in zip(coordNames, X)])
                if time is not None:
                    # If time is specified, select the nearest timestep first.
                    timeDict = {timeColumn: time}
                    valArray = xarrayData.sel(**timeDict, method='nearest')
                else:
                    valArray = xarrayData

                # Field mapping: kwargs maps OF field names to xarray field names.
                # If the mapping is a tuple (e.g. U=("u","v",0)), it's a vector/tensor.
                # Each component can be a field name (string) or a fixed value (float).
                if isinstance(fieldName, Iterable):
                    valArray = []
                    for mappedField in fieldName:
                        if isinstance(mappedField, str):
                            valArray.append(valArray[mappedField].isel(**accessDict).item())
                        elif isinstance(mappedField, float) or isinstance(mappedField, int):
                            valArray.append(mappedField)
                        else:
                            err = f"The mapping {OFField}->{fieldName} contains a value that is not a string or number. "
                            raise ValueError(err)
                    if len(fieldName) == 3:
                        fielaValueStr += f"volVectorFieldValue {OFField} {arryToOFvector(valArray)}\n"
                    elif len(fieldName) == 9:
                        fielaValueStr += f"volTensorFieldValue {OFField} {arryToOFvector(valArray)}\n"
                    else:
                        err = f"The number of components in the  mapping {OFField}->{fieldName} must be 1,3 or 9. got {len(fieldName)}."
                        raise ValueError(err)
                else:
                    value = valArray[fieldName].isel(**accessDict).item()

                    fielaValueStr += f"volScalarFieldValue {OFField} {value}\n"

            BoxRecord = f"""
                boxToCell 
                {{
                    box {arryToOFvector(lowerLeft)} {arryToOFvector(upperRight)};
                    fieldValues 
                    (
                        {fielaValueStr}

                    );                  
                }}
            """
            ret.append(BoxRecord)
        return "\n".join(ret)

    def getVTKPipelineCacheDocuments(self, regularMesh=None, filterName=None, workflowName=None, groupName=None):
        """
            Return the list of the cached documents in the case.
        Parameters
        ----------
        regularMesh
        filterName

        Returns
        -------

        """
        qry = dict()
        if regularMesh is not None:
            qry['regularMesh'] = regularMesh

        if filterName is not None:
            qry['filterName'] = filterName

        if workflowName is not None:
            simdata = qry.setdefault("simulation",dict())
            simdata['workflowName'] = workflowName

        if groupName is not None:
            simdata = qry.setdefault("simulation",dict())
            simdata['groupName'] = groupName


        return self.getCacheDocuments(type=TYPE_VTK_FILTER, **dictToMongoQuery(qry))

    def getVTKPipelineCacheTable(self,regularMesh=None, filterName=None, workflowName=None, groupName=None):
        """
            Return the table.
        Parameters
        ----------
        regularMesh
        filterName
        workflowName
        groupName

        Returns
        -------

        """
        docList = self.getVTKPipelineCacheDocuments(regularMesh=regularMesh, filterName=filterName,
                                                    workflowName=workflowName, groupName=groupName)
        cacheDict = [dict(filterName=doc.desc['filterName'],workflowName=doc.desc['simulation']['workflowName'],groupName=doc.desc['simulation']['groupName']) for doc in docList]
        return pandas.DataFrame(cacheDict)

    def clearVTKPipelineCache(self, regularMesh=None, filterName=None, workflowName=None, groupName=None):
        """
            deletes the cache documents and the data from the disk.
            Use with care!.
        Parameters
        ----------
        regularMesh
        filterName
        workflowName
        groupName

        Returns
        -------

        """

        # 1. Get the potential filters to process
        logger = get_classMethod_logger(self, "clearCache")

        docList = self.getVTKPipelineCacheDocuments(regularMesh = regularMesh, filterName = filterName, workflowName= workflowName, groupName= groupName)

        logger.info(f"Found {len(docList)} documents to delete. ")
        for doc in docList:
            logger.debug(f"Deleting resource {doc['desc']['filterName']} : {doc['desc']['pipeline']['filters']} ")
            outputFile = doc['resource']

            if os.path.exists(outputFile):
                if os.path.isfile(outputFile):
                    os.remove(outputFile)
                else:
                    shutil.rmtree(outputFile)

        for doc in docList:
            doc.delete()

    def getTimeList(self,nameOrWorkflowFileOrJSONOrResourceorDirectory,singleProcessor=False,returnFirst=True):
        """
            Extract the computed Time steps from the case.
            Tries to load as parallel, and if fails falls to the single (unless singleProcessor is True
        Parameters
        ----------
        nameOrWorkflowFileOrJSONOrResourceorDirectory: str
                the case directory, Name, directory, json, or json-file of a case.
        singleProcessor

        Returns
        -------

        """
        logger = get_classMethod_logger(self,"getTimeList")
        logger.info("Getting the simulation from the database")
        docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource=nameOrWorkflowFileOrJSONOrResourceorDirectory)
        if len(docList) ==0 :
            logger.info(f"Simulation not found, trying to interpret as a directory on the disk")
            if os.path.exists(nameOrWorkflowFileOrJSONOrResourceorDirectory):
                workflowFile = os.path.basename(nameOrWorkflowFileOrJSONOrResourceorDirectory)
                workflowName = os.path.splitext(workflowFile)[0]
                casePathList = [(workflowName, nameOrWorkflowFileOrJSONOrResourceorDirectory)]
            else:
                err = f"Case {nameOrWorkflowFileOrJSONOrResourceorDirectory} not found in DB or on disk"
                logger.error(err)
                raise ValueError(err)
        else:
            casePathList = [(x.desc['workflowName'],x.getData()) for x in docList]

        ret = dict()
        for name,case in casePathList:

            processorList = [os.path.basename(proc) for proc in glob.glob(os.path.join(case, "processor*"))]
            isSingle = len(processorList) == 0 or singleProcessor
            if isSingle:
                timeList = sorted([float(x) for x in os.listdir(case) if (
                        os.path.isdir(x) and
                        x.isdigit() and
                        (not x.startswith("processor") and x not in ["constant", "system", "rootCase", 'VTK']))],
                                  key=lambda x: int(x))
            else:
                timeList = sorted([float(x) for x in os.listdir(os.path.join(case, processorList[0])) if (
                        os.path.isdir(os.path.join(case, processorList[0], x)) and
                        x.isdigit() and
                        (not x.startswith("processor") and x not in ["constant", "system", "rootCase", 'VTK']))],
                                  key=lambda x: int(x))
            ret[name] = timeList

        return ret[casePathList[0][0]] if returnFirst else ret

__init__(projectName, filesDirectory=None, connectionName=None)

Initialize the OpenFOAM toolkit with solver extensions and object home.

Source code in hera/simulations/openFoam/toolkit.py
def __init__(self, projectName, filesDirectory=None, connectionName=None):
    """Initialize the OpenFOAM toolkit with solver extensions and object home."""
    super().__init__(projectName=projectName,
                     filesDirectory=filesDirectory,
                     toolkitName="OFworkflowToolkit", 
                     connectionName=connectionName)

    self.OFObjectHome = OFObjectHome()
    self._analysis = Analysis(self)
    self._presentation = Presentation(self, self.analysis)
    self.stochasticLagrangian = StochasticLagrangianSolver_toolkitExtension(self)
    self.buoyantReactingFoam  = buoyantReactingFoam_toolkitExtension(self)

runOFSimulation(nameOrWorkflowFileOrJSONOrResource)

Build the workflow and then runs the simulation.

Parameters:

Name Type Description Default
nameOrWorkflowFileOrJSONOrResource
required
Source code in hera/simulations/openFoam/toolkit.py
def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource):
    """
        Build the workflow and then runs the simulation.

    Parameters
    ----------
    nameOrWorkflowFileOrJSONOrResource

    Returns
    -------

    """
    logger = get_classMethod_logger(self,"runOFSimulation")
    logger.info("Building the case")
    self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource)

    logger.info("Executing the cases")
    docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)
    for doc in docList:
        logger.info(f"Executing {doc.desc['workflowName']}")
        os.chdir(doc.resource)
        os.system("./Allrun")

prepareSlurmWorkflowExecution(baseConfiguration, jsonVariations, slurmExecutionFileName='submit_all.sh', caseListFileName='cases.txt', allocateProcessorsPerRun=None, memoryInGB=None, jobName='foam_cases', exclusive=False, addAllRun=True)

Adds the different configurations to the workgroup,

Parameters:

Name Type Description Default
baseConfiguration dict
basic hermes workflow to run
required
jsonVariations
Variation file (using the jsonutils variations) format.
required
slurmExecutionFileName
The name of the bash file to create with the slurm batch run
'submit_all.sh'
caseListFileName
The batchfile uses case file name, so add it.
'cases.txt'
allocateProcessorsPerRun int | None
How many nodes(currently just threads) are used per job, in slurm documentation they describe "This option advises ... that job steps run ... will launch a maximum of number tasks and to provide for sufficient resources."
None
memoryInGB str | int | none
Should memory be limited, affect depends on configuration(might kill if memory use is exceeded)
None
jobName bool
name for slurm job
'foam_cases'
exclusive bool
Should slurm run one job at a time on a GRES(Generic RESource in our case CPUs)
False
addAllRun
Should slurm do Allrun
True
Source code in hera/simulations/openFoam/toolkit.py
    def prepareSlurmWorkflowExecution(self,baseConfiguration,
                              jsonVariations,
                              slurmExecutionFileName="submit_all.sh",
                              caseListFileName="cases.txt",
                              allocateProcessorsPerRun=None,
                              memoryInGB=None,
                              jobName="foam_cases",
                              exclusive=False,
                              addAllRun=True):
        """
            Adds the different configurations to the workgroup,

        Parameters
        ----------
        baseConfiguration : dict
                basic hermes workflow to run
        jsonVariations :
                Variation file (using the jsonutils variations) format.
        slurmExecutionFileName: str
                The name of the bash file to create with the slurm batch run
        caseListFileName:
                The batchfile uses case file name, so add it.
        allocateProcessorsPerRun : int | None
                How many nodes(currently just threads) are used per job, in slurm documentation they describe "This option advises ... that job steps run ... will launch a maximum of number tasks and to provide for sufficient resources."
        memoryInGB : str | int | none
                Should memory be limited, affect depends on configuration(might kill if memory use is exceeded)
        jobName : bool
                name for slurm job
        exclusive : bool
                Should slurm run one job at a time on a GRES(Generic RESource in our case CPUs)
        addAllRun: bool
                Should slurm do Allrun
        Returns
        -------

        """

        logger = get_classMethod_logger(self,"prepareSlurmWorkflowExecution")
        caseList = ""
        if isinstance(baseConfiguration,str):
            logger.info(f"Assuming {baseConfiguration} is workflow name")
            workflow=self.getWorkflowDocumentByName(baseConfiguration)
            baseConfiguration = workflow['desc']['workflow']
        elif not isinstance(baseConfiguration, dict):
            logger.error("Slurm preparation can only handle base workflow contents or workflow name")

        if isinstance(jsonVariations, str):
            logger.info(f"Assuming {jsonVariations} is path to variations file")
            with open(jsonVariations, 'r') as variationsFile:
                jsonVariations = json.load(variationsFile)
        elif not isinstance(jsonVariations, dict):
            logger.error("Slurm preparation only supports json variation input as path or dict")


        for jsonConfig in JSONVariations(baseConfiguration, jsonVariations):
            doc = self.addWorkflowToGroup(workflowJSON=jsonConfig,
                                          groupName=workflow['desc']['groupName'],
                                          writeWorkflowToFile=True)
            caseList += f"{doc.desc['workflowName']}\n"
            logger.info(f"Adding {doc.desc['workflowName']}")

        allRunPart = """cd "$dir" || {{ echo "Directory $dir not found"; exit 1; }}

# Run the Allrun script
bash Allrun"""

        script =f"""
hera-workflows sync --force "$dir"; hera-workflows buildExecute "$dir"

{allRunPart if addAllRun else ""}
        """
        caseListFilePath = os.path.join(self.filesDirectory, caseListFileName)

        logger.info(f"Writing case list file for group {workflow['desc']['groupName']}")

        with open(caseListFilePath,"w") as outputFile:
            outputFile.write(caseList)
        slurm.prepareSlurmScriptExecution(script=script,
                                          slurmExecutionFilePath=os.path.join(self.filesDirectory,slurmExecutionFileName),
                                          jobDirListFilePath=caseListFilePath,
                                          allocateProcessorsPerRun=allocateProcessorsPerRun,
                                          memoryInGB=memoryInGB,
                                          jobName=jobName,
                                          quiet=False,
                                          exclusive=exclusive)

processorList(caseDirectory)

Returns the list of processors directories in the case

Parameters:

Name Type Description Default
caseDirectory str

Path to the directory.

required
Source code in hera/simulations/openFoam/toolkit.py
def processorList(self, caseDirectory):
    """
        Returns the list of processors directories in the case
    Parameters
    ----------
    caseDirectory : str
        Path to the directory.

    Returns
    -------

    """
    return [os.path.basename(proc) for proc in glob.glob(os.path.join(caseDirectory, "processor*"))]

getHermesWorkflow_Flow(workflowfile)

Returns the workflow of the requested JSON file.

Parameters:

Name Type Description Default
workflowfile
required
Source code in hera/simulations/openFoam/toolkit.py
def getHermesWorkflow_Flow(self, workflowfile):
    """
        Returns the workflow of the requested JSON file.
    Parameters
    ----------
    workflowfile

    Returns
    -------

    """
    return workflow_Eulerian(workflowfile)

getMeshFromName(nameOrWorkflowFileOrJSONOrResource, readParallel=True, time=0)

Returns the name from the workflow

Parameters:

Name Type Description Default
nameOrWorkflowFileOrJSONOrResource string or dict
required
The
required
readParallel
If parallel case exists, read it .
True
time float

The time to read the mesh from. (relevant for mesh moving cases).

0
Source code in hera/simulations/openFoam/toolkit.py
def getMeshFromName(self,nameOrWorkflowFileOrJSONOrResource,readParallel=True, time=0):
    """
        Returns the name from the workflow
    Parameters
    ----------
    nameOrWorkflowFileOrJSONOrResource : string or dict
    The name/dict that defines the item

    readParallel: bool
            If parallel case exists, read it .

    time : float
        The time to read the mesh from. (relevant for mesh moving cases).

    Returns
    -------

    """
    docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)
    if len(docList)==0:
        return None
    else:
        doc = docList[0]

    return self.getMesh(doc.getData())

getMesh(caseDirectory, readParallel=True, time=0)

Reads the mesh from the mesh directory.

Reads the decomposed case if it exists and parallel is true,
otherwise, reads just the single case.

Unfortunately, we have to use the OF postProcess utility in order to interpolate the
mesh points to their centers.

Returns:

Type Description
pandas dataframe with the points in the columns x,y,z

the index column (don't mix up with the index of pandas) is the sequential number of the point.

If the case is decomposed, return processorNumber and index columns. The index is the internal order in the processor.

Source code in hera/simulations/openFoam/toolkit.py
def getMesh(self, caseDirectory, readParallel=True, time=0):
    """
        Reads the mesh from the mesh directory.

        Reads the decomposed case if it exists and parallel is true,
        otherwise, reads just the single case.

        Unfortunately, we have to use the OF postProcess utility in order to interpolate the
        mesh points to their centers.

    Parameters
    ----------
        caseDirectory: str
                The path to the case. Should be absolute in order to determine whether we need to add the -case tot he postProcess.

        readParallel: bool
                If parallel case exists, read it .

        time : float
            The time to read the mesh from. (relevant for mesh moving cases).

    Returns
    -------
        pandas dataframe with the points in the columns             x,y,z
        the index column (don't mix up with the index of pandas)  is the sequential number of the point.

        If the case is decomposed, return processorNumber and index columns.
        The index is the internal order in the processor.
    """

    # 1. Run the postProcess utility to set the cell centers
    logger = get_classMethod_logger(self, "getMesh")
    logger.info(f"Start. case {caseDirectory}. Current directory is : {os.getcwd()}.")

    casePointer = "" if caseDirectory == os.getcwd() else f"-case {caseDirectory}"

    useParallel = False
    if readParallel:
        logger.debug(f"Attempt to load parallel case")
        # Check if the case is decomposed, if it is, run it.
        proc0dir = os.path.join(caseDirectory, "processor0")

        if os.path.exists(proc0dir):
            logger.debug(f"Found parallel case, using decomposed case")
            useParallel = True
        else:
            logger.debug(f"parallel case NOT found. Using composed case")

    # Calculating the cell centers
    checkPath = os.path.join(caseDirectory, "processor0", str(time), "C") if useParallel else os.path.join(
        caseDirectory, str(time), "C")
    parallelExec = "-parallel" if useParallel else ""
    caseType = "decomposed" if useParallel else "composed"
    if not os.path.exists(checkPath):
        logger.debug(f"Cell centers does not exist in {caseType} case. Calculating...")
        os.system(f"foamJob {parallelExec} {casePointer} -wait postProcess -func writeCellCentres  -time {time}")
        logger.debug(f"done: foamJob {parallelExec} -wait postProcess -func writeCellCentres {casePointer} -time {time}")
        if not os.path.exists(checkPath):
            logger.error("Error running the writeCellCentres. Executing writeCellCentres failed. Are you sure that the openFOAM environment is set?"\
                         "try to load the enviroment and then rerun (in jupyter, you need to restart the server)")

            raise RuntimeError("Error running the writeCellCentres. Check mesh")
    else:
        logger.debug(f"Cell centers exist in {caseType} case.")

    logger.debug(f"Loading the cell centers in time {time}. Using {caseType}")
    cellCenters = self.OFObjectHome.readFieldFromCase(fieldName="cellCenters",
                                                     flowType=FLOWTYPE_INCOMPRESSIBLE,
                                                     caseDirectory=caseDirectory,
                                                     timeStep=time,
                                                     readParallel=readParallel)
    return cellCenters

getMeshExtentFromName(nameOrWorkflowFileOrJSONOrResource, readParallel=True, time=0)

Returns the name from the workflow

Parameters:

Name Type Description Default
nameOrWorkflowFileOrJSONOrResource string or dict
required
The
required
readParallel
If parallel case exists, read it .
True
time float

The time to read the mesh from. (relevant for mesh moving cases).

0
Source code in hera/simulations/openFoam/toolkit.py
def getMeshExtentFromName(self,nameOrWorkflowFileOrJSONOrResource,readParallel=True, time=0):
    """
        Returns the name from the workflow
    Parameters
    ----------
    nameOrWorkflowFileOrJSONOrResource : string or dict
    The name/dict that defines the item

    readParallel: bool
            If parallel case exists, read it .

    time : float
        The time to read the mesh from. (relevant for mesh moving cases).

    Returns
    -------

    """
    docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)
    if len(docList)==0:
        return None
    else:
        doc = docList[0]

    return self.getMeshExtent(doc.getData())

read_points_file(path)

Parse an OpenFOAM points file and return coordinates as a numpy array.

Source code in hera/simulations/openFoam/toolkit.py
def read_points_file(self,path):
    """Parse an OpenFOAM points file and return coordinates as a numpy array."""
    pts = []
    with open(path) as f:
        lines = f.readlines()

    # find the line containing only the number (e.g., "1606203")
    idx = next(i for i,l in enumerate(lines) if l.strip().isdigit())

    # points start 2 lines after that:
    start = idx + 2

    for line in lines[start:]:
        line = line.strip()
        if line == ")":      # end of list
            break
        if line.startswith("(") and line.endswith(")"):
            x, y, z = line[1:-1].split()
            pts.append([float(x), float(y), float(z)])
    return numpy.array(pts)

getMeshExtent(caseDirectory)

Return the bounding box of the mesh from the points file.

Source code in hera/simulations/openFoam/toolkit.py
def getMeshExtent(self,caseDirectory):
    """Return the bounding box of the mesh from the points file."""
    points_path = os.path.join(caseDirectory,"constant","polyMesh","points")
    if not os.path.exists(points_path):
        raise FileNotFoundError(f"File not found: {points_path}")

    # Parse the blockMeshDict
    pts = self.read_points_file(points_path)

    # Compute the coordinate bounds
    xmin, ymin, zmin = pts.min(axis=0)
    xmax, ymax, zmax = pts.max(axis=0)

    bounds = {
        "x": (xmin, xmax),
        "y": (ymin, ymax),
        "z": (zmin, zmax)
    }
    return bounds

createEmptyCase(caseDirectory: str, fieldList: list, flowType: str, additionalFieldsDescription=dict())

Creates an empty case directory for the simulation.
fields is a list of fields to create in the case directory.

The simulation type (copressible, incompressible, dispersion) is needed to get the dimensions and components
of the fields. If the fields are not in the standard list then their description can be supplied in the
additionalFieldsDescription parameters

Parameters:

Name Type Description Default
caseDirectory str

The case directory to create

required
fieldList list

The list of field names to create

required
flowType str

compressible, incompressible or dispersion. The dimension of the fields is determined by the type of simulation

required
additionalFieldsDescription dict | str

Definition of additional fields: has the structure : { dimensions : {kg : .., m : ..}, componentNames : None|list ) the keys for the dimensions are kg,m,s,K,mol,A,cd

Can also be a JSON file name.

dict()
Source code in hera/simulations/openFoam/toolkit.py
def createEmptyCase(self, caseDirectory: str, fieldList: list, flowType: str, additionalFieldsDescription=dict()):
    """
        Creates an empty case directory for the simulation.
        fields is a list of fields to create in the case directory.

        The simulation type (copressible, incompressible, dispersion) is needed to get the dimensions and components
        of the fields. If the fields are not in the standard list then their description can be supplied in the
        additionalFieldsDescription parameters

    Parameters
    ----------
    caseDirectory : str
        The case directory to create

    fieldList : list
        The list of field names to create

    flowType : str
        compressible, incompressible or dispersion.
        The dimension of the fields is determined by the type of simulation

    additionalFieldsDescription : dict | str
        Definition of additional fields:
        has the structure :
        {
            dimensions : {kg : .., m : ..},
            componentNames : None|list
        )
        the keys for the dimensions are kg,m,s,K,mol,A,cd

        Can also be a JSON file name.

    Returns
    -------

    """
    logger = get_classMethod_logger(self,"createEmptyCase")
    logger.info(f"Making case {caseDirectory} with fields {','.join(fieldList)}")

    # Make the case :
    if os.path.isfile(caseDirectory):
        raise ValueError(
            f"The file {caseDirectory} exists as a file. Cannot create a directory. Please remove/rename it and rerun. ")

    os.makedirs(os.path.join(caseDirectory, "constant"), exist_ok=True)
    os.makedirs(os.path.join(caseDirectory, "system"), exist_ok=True)
    os.makedirs(os.path.join(caseDirectory, "constant", "triSurface"), exist_ok=True)
    os.makedirs(os.path.join(caseDirectory, "0"), exist_ok=True)
    os.makedirs(os.path.join(caseDirectory, "0.orig"), exist_ok=True)
    os.makedirs(os.path.join(caseDirectory, "0.parallel"), exist_ok=True)

    fileaddition = dict()
    if additionalFieldsDescription is not None:
        fileaddition = loadJSON(additionalFieldsDescription)

    for fieldName, fieldDefs in fileaddition.items():
        logger.info(f"Adding temporary field {fieldName} to the field directory")
        self.OFObjectHome.addFieldDefinitions(fieldName=fieldName, **fieldDefs)

    # Makes the empty fields
    for fieldName in fieldList:
        logger.info(f"Creating field {fieldName}")
        self.writeEmptyField(fieldName=fieldName,flowType=flowType,caseDirectory=caseDirectory,timeOrLocation=0)
        self.writeEmptyField(fieldName=fieldName,flowType=flowType,caseDirectory=caseDirectory,timeOrLocation="0.orig")
        self.writeEmptyField(fieldName=fieldName,flowType=flowType,caseDirectory=caseDirectory,timeOrLocation="0.parallel",writeProcBoundary=True)

writeEmptyField(fieldName, flowType, caseDirectory, timeOrLocation=0, readBoundaryFromCase=False, writeProcBoundary=False)

Writes an empty field in the case.

If the readBoundaryField is True, then the field is written with the relevant boundaries (that are red from the case).

Parameters:

Name Type Description Default
fieldName str

The name of the field

required
flowType str

The flow type (compressible/incompressible)

required
timeOrLocation float[default:0]

The time to write the new field in

0
caseDirectory str

The name of te new case directory

required
readParallel bool

If true, then attempt to write as parallel fields.

required
readBoundaryFromCase bool

If True, tries to read the boundary names from the case. Otherwise write the general ".*" boundary field.

False
writeProcBoundary bool

If true writes the proc boundaries. If readBoundaryFromCase is True, then write the specific proc for each processor (when it is parallel) Otherwise, write the "proc.*" boundary field.

False
Source code in hera/simulations/openFoam/toolkit.py
def writeEmptyField(self,fieldName,flowType,caseDirectory,timeOrLocation=0,readBoundaryFromCase=False,writeProcBoundary=False):
    """
        Writes an empty field in the case.

        If the readBoundaryField is True, then the field is written with the relevant boundaries (that are red from the case).

    Parameters
    ----------
    fieldName : str
        The name of the field
    flowType : str
        The flow type (compressible/incompressible)
    timeOrLocation : float  [default : 0]
        The time to write the new field in
    caseDirectory : str
        The name of te new case directory
    readParallel : bool
        If true, then attempt to write as parallel fields.
    readBoundaryFromCase : bool
        If True, tries to read the boundary names from the case.
        Otherwise write the general ".*" boundary field.

    writeProcBoundary : bool
       If true writes the proc boundaries.
       If readBoundaryFromCase is True, then write the specific proc for each processor (when it is parallel)
       Otherwise, write the "proc.*" boundary field.

    Returns
    -------

    """
    logger = get_classMethod_logger(self,"writeEmptyField")
    logger.info(f"Creating the field: {fieldName}. ")

    field = self.OFObjectHome.getEmptyField(fieldName=fieldName, flowType=flowType)

    if readBoundaryFromCase:
        field.readBoundariesFromCase(caseDirectory,readParallel=True)

    if writeProcBoundary:
        field.addProcBoundary()
    field.writeToCase(caseDirectory=caseDirectory, timeOrLocation=timeOrLocation)

template_add(name, objFile, workflowObj=None)

Adds a templates to the toolkit.

Templates can be
    - Flow : Holds Hermes flow templates.
    - Node : Holds a hermes node objects
    - Field : Holds a field templates.
            This can be
                * xarray
                * pandas/dask
                * constant

Parameters:

Name Type Description Default
name
required
objFile
required
workflowObj
None
Source code in hera/simulations/openFoam/toolkit.py
def template_add(self, name, objFile, workflowObj=None):
    """
        Adds a templates to the toolkit.

        Templates can be
            - Flow : Holds Hermes flow templates.
            - Node : Holds a hermes node objects
            - Field : Holds a field templates.
                    This can be
                        * xarray
                        * pandas/dask
                        * constant

    Parameters
    ----------
    name
    objFile
    workflowObj

    Returns
    -------

    """
    pass

xarrayToSetFieldsDictDomain(xarrayData, xColumnName='x', yColumnName='y', zColumnName='z', time=None, timeColumn='time', **kwargs)

Converts the xarray to the setFields dict of the internal domain.

Not debugged.

Use

Parameters:

Name Type Description Default
xarrayData xarray dataset
The data set as xarray.
required
xColumnName string

The coordinate name of the x-axis If None, ignore that coordinate (the mesh should be in the right dimension).

'x'
yColumnName

The coordinate name of the y-axis If None, ignore that coordinate (the mesh should be in the right dimension).

'y'
zColumnName

The coordinate name of the z-axis If None, ignore that coordinate (the mesh should be in the right dimension).

'z'
time string

The time is not None, select the closest (nearest neighbours).

If None ignore the index.

None
kwargs dict

A mapping of OpenFoamField -> dataset Field If the dataset Field is a tuple, then write it as vector. Each component can be either a field, float (to keep fixed value). If you want to map a function, just create its value as a Field name.

so U = ("u","v",0) will map the feld 'u' as Ux, 'v' as Uy and Uz=0

{}

Returns:

Type Description
string (setField dict).
Source code in hera/simulations/openFoam/toolkit.py
def xarrayToSetFieldsDictDomain(self, xarrayData, xColumnName="x", yColumnName="y", zColumnName="z", time=None,
                                timeColumn="time", **kwargs):
    """
        Converts the xarray to the setFields dict of the internal domain.

        Not debugged.

        Use

    Parameters
    ----------
    xarrayData : xarray dataset
            The data set as xarray.
    xColumnName : string
        The coordinate name of the x-axis
        If None, ignore that coordinate (the mesh should be in the right dimension).
    yColumnName: string
        The coordinate name of the y-axis
        If None, ignore that coordinate (the mesh should be  in the right dimension).
    zColumnName: string
        The coordinate name of the z-axis
        If None, ignore that coordinate (the mesh should be  in the right dimension).
    time : string
        The time is not None, select the closest (nearest neighbours).

        If None ignore the index.
    kwargs : dict
        A mapping of OpenFoamField -> dataset Field
        If the dataset Field is a tuple, then write it as vector.
        Each component can be either a field, float (to keep fixed value).
        If you want to map a function, just create its value as a Field name.

        so
            U = ("u","v",0)
            will map the feld 'u' as Ux, 'v' as Uy and Uz=0


    Returns
    -------
        string (setField dict).

    """
    logger = get_classMethod_logger(self, "xarrayToSetFieldsDict")
    logger.debug(f"------------ Start  : {logger.name} ")

    # Build the list of coordinate axes to iterate over.
    # For each active axis, store the number of cells (shape - 1, since
    # we iterate cell boundaries, not cell centers).
    coordList = []
    coordNames = []
    if xColumnName is not None:
        coordList.append(xarrayData.coords[xColumnName].shape[0] - 1)
        coordNames.append(xColumnName)
    if yColumnName is not None:
        coordList.append(xarrayData.coords[yColumnName].shape[0] - 1)
        coordNames.append(yColumnName)
    if zColumnName is not None:
        coordList.append(xarrayData.coords[zColumnName].shape[0] - 1)
        coordNames.append(zColumnName)

    ret = []
    arryToOFvector = lambda arry: f"({' '.join(arry)} )"

    # Iterate over all cells using Cartesian product of index ranges.
    # Each X is a tuple of indices (ix, iy, iz) defining one cell.
    # For each cell, compute the bounding box (lowerLeft, upperRight)
    # from the coordinate arrays, then extract field values at that cell.
    for X in product(*coordList):
        lowerLeft = [xarrayData.coords[coordNames[coordIndx]][ind].item() for coordIndx, ind in enumerate(X)]
        upperRight = [xarrayData.coords[coordNames[coordIndx]][ind + 1].item() for coordIndx, ind in enumerate(X)]

        fielaValueStr = ""
        for OFField, fieldName in kwargs.items():

            # Build the isel accessor dict for this cell (e.g. {"x": 3, "y": 5}).
            accessDict = dict([(coordname, indx) for (coordname, indx) in zip(coordNames, X)])
            if time is not None:
                # If time is specified, select the nearest timestep first.
                timeDict = {timeColumn: time}
                valArray = xarrayData.sel(**timeDict, method='nearest')
            else:
                valArray = xarrayData

            # Field mapping: kwargs maps OF field names to xarray field names.
            # If the mapping is a tuple (e.g. U=("u","v",0)), it's a vector/tensor.
            # Each component can be a field name (string) or a fixed value (float).
            if isinstance(fieldName, Iterable):
                valArray = []
                for mappedField in fieldName:
                    if isinstance(mappedField, str):
                        valArray.append(valArray[mappedField].isel(**accessDict).item())
                    elif isinstance(mappedField, float) or isinstance(mappedField, int):
                        valArray.append(mappedField)
                    else:
                        err = f"The mapping {OFField}->{fieldName} contains a value that is not a string or number. "
                        raise ValueError(err)
                if len(fieldName) == 3:
                    fielaValueStr += f"volVectorFieldValue {OFField} {arryToOFvector(valArray)}\n"
                elif len(fieldName) == 9:
                    fielaValueStr += f"volTensorFieldValue {OFField} {arryToOFvector(valArray)}\n"
                else:
                    err = f"The number of components in the  mapping {OFField}->{fieldName} must be 1,3 or 9. got {len(fieldName)}."
                    raise ValueError(err)
            else:
                value = valArray[fieldName].isel(**accessDict).item()

                fielaValueStr += f"volScalarFieldValue {OFField} {value}\n"

        BoxRecord = f"""
            boxToCell 
            {{
                box {arryToOFvector(lowerLeft)} {arryToOFvector(upperRight)};
                fieldValues 
                (
                    {fielaValueStr}

                );                  
            }}
        """
        ret.append(BoxRecord)
    return "\n".join(ret)

getVTKPipelineCacheDocuments(regularMesh=None, filterName=None, workflowName=None, groupName=None)

Return the list of the cached documents in the case.

Parameters:

Name Type Description Default
regularMesh
None
filterName
None
Source code in hera/simulations/openFoam/toolkit.py
def getVTKPipelineCacheDocuments(self, regularMesh=None, filterName=None, workflowName=None, groupName=None):
    """
        Return the list of the cached documents in the case.
    Parameters
    ----------
    regularMesh
    filterName

    Returns
    -------

    """
    qry = dict()
    if regularMesh is not None:
        qry['regularMesh'] = regularMesh

    if filterName is not None:
        qry['filterName'] = filterName

    if workflowName is not None:
        simdata = qry.setdefault("simulation",dict())
        simdata['workflowName'] = workflowName

    if groupName is not None:
        simdata = qry.setdefault("simulation",dict())
        simdata['groupName'] = groupName


    return self.getCacheDocuments(type=TYPE_VTK_FILTER, **dictToMongoQuery(qry))

getVTKPipelineCacheTable(regularMesh=None, filterName=None, workflowName=None, groupName=None)

Return the table.

Parameters:

Name Type Description Default
regularMesh
None
filterName
None
workflowName
None
groupName
None
Source code in hera/simulations/openFoam/toolkit.py
def getVTKPipelineCacheTable(self,regularMesh=None, filterName=None, workflowName=None, groupName=None):
    """
        Return the table.
    Parameters
    ----------
    regularMesh
    filterName
    workflowName
    groupName

    Returns
    -------

    """
    docList = self.getVTKPipelineCacheDocuments(regularMesh=regularMesh, filterName=filterName,
                                                workflowName=workflowName, groupName=groupName)
    cacheDict = [dict(filterName=doc.desc['filterName'],workflowName=doc.desc['simulation']['workflowName'],groupName=doc.desc['simulation']['groupName']) for doc in docList]
    return pandas.DataFrame(cacheDict)

clearVTKPipelineCache(regularMesh=None, filterName=None, workflowName=None, groupName=None)

deletes the cache documents and the data from the disk.
Use with care!.

Parameters:

Name Type Description Default
regularMesh
None
filterName
None
workflowName
None
groupName
None
Source code in hera/simulations/openFoam/toolkit.py
def clearVTKPipelineCache(self, regularMesh=None, filterName=None, workflowName=None, groupName=None):
    """
        deletes the cache documents and the data from the disk.
        Use with care!.
    Parameters
    ----------
    regularMesh
    filterName
    workflowName
    groupName

    Returns
    -------

    """

    # 1. Get the potential filters to process
    logger = get_classMethod_logger(self, "clearCache")

    docList = self.getVTKPipelineCacheDocuments(regularMesh = regularMesh, filterName = filterName, workflowName= workflowName, groupName= groupName)

    logger.info(f"Found {len(docList)} documents to delete. ")
    for doc in docList:
        logger.debug(f"Deleting resource {doc['desc']['filterName']} : {doc['desc']['pipeline']['filters']} ")
        outputFile = doc['resource']

        if os.path.exists(outputFile):
            if os.path.isfile(outputFile):
                os.remove(outputFile)
            else:
                shutil.rmtree(outputFile)

    for doc in docList:
        doc.delete()

getTimeList(nameOrWorkflowFileOrJSONOrResourceorDirectory, singleProcessor=False, returnFirst=True)

Extract the computed Time steps from the case.
Tries to load as parallel, and if fails falls to the single (unless singleProcessor is True

Parameters:

Name Type Description Default
nameOrWorkflowFileOrJSONOrResourceorDirectory
the case directory, Name, directory, json, or json-file of a case.
required
singleProcessor
False
Source code in hera/simulations/openFoam/toolkit.py
def getTimeList(self,nameOrWorkflowFileOrJSONOrResourceorDirectory,singleProcessor=False,returnFirst=True):
    """
        Extract the computed Time steps from the case.
        Tries to load as parallel, and if fails falls to the single (unless singleProcessor is True
    Parameters
    ----------
    nameOrWorkflowFileOrJSONOrResourceorDirectory: str
            the case directory, Name, directory, json, or json-file of a case.
    singleProcessor

    Returns
    -------

    """
    logger = get_classMethod_logger(self,"getTimeList")
    logger.info("Getting the simulation from the database")
    docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource=nameOrWorkflowFileOrJSONOrResourceorDirectory)
    if len(docList) ==0 :
        logger.info(f"Simulation not found, trying to interpret as a directory on the disk")
        if os.path.exists(nameOrWorkflowFileOrJSONOrResourceorDirectory):
            workflowFile = os.path.basename(nameOrWorkflowFileOrJSONOrResourceorDirectory)
            workflowName = os.path.splitext(workflowFile)[0]
            casePathList = [(workflowName, nameOrWorkflowFileOrJSONOrResourceorDirectory)]
        else:
            err = f"Case {nameOrWorkflowFileOrJSONOrResourceorDirectory} not found in DB or on disk"
            logger.error(err)
            raise ValueError(err)
    else:
        casePathList = [(x.desc['workflowName'],x.getData()) for x in docList]

    ret = dict()
    for name,case in casePathList:

        processorList = [os.path.basename(proc) for proc in glob.glob(os.path.join(case, "processor*"))]
        isSingle = len(processorList) == 0 or singleProcessor
        if isSingle:
            timeList = sorted([float(x) for x in os.listdir(case) if (
                    os.path.isdir(x) and
                    x.isdigit() and
                    (not x.startswith("processor") and x not in ["constant", "system", "rootCase", 'VTK']))],
                              key=lambda x: int(x))
        else:
            timeList = sorted([float(x) for x in os.listdir(os.path.join(case, processorList[0])) if (
                    os.path.isdir(os.path.join(case, processorList[0], x)) and
                    x.isdigit() and
                    (not x.startswith("processor") and x not in ["constant", "system", "rootCase", 'VTK']))],
                              key=lambda x: int(x))
        ret[name] = timeList

    return ret[casePathList[0][0]] if returnFirst else ret

LSM

LSMToolkit

hera.simulations.LSM.toolkit.LSMToolkit

Bases: abstractToolkit

The LSM.old toolkit

The datasources are the templates.

The LSM.old template JSON structure is :

<< TO DO >>

Source code in hera/simulations/LSM/toolkit.py
class LSMToolkit(toolkit.abstractToolkit):
    """
        The LSM.old toolkit

        The datasources are the templates.

        The LSM.old template JSON structure is :


        << TO DO >>



    """
    TRUE = ".TRUE."
    FALSE = ".FALSE."
    _to_xarray = None
    _to_database = None
    _forceKeep = None
    _analysis = None

    @property
    def analysis(self):
        return self._analysis
    @property
    def to_xarray(self):
        return self._to_xarray

    @to_xarray.setter
    def to_xarray(self, value):
        if not isinstance(value,bool):
            raise ValueError("to_xarray must be boolean")
        self._to_xarray = value


    @property
    def to_database(self):
        return self._to_database

    @to_database.setter
    def to_database(self, value):
        if not isinstance(value,bool):
            raise ValueError("to_xarray must be boolean")
        self._to_database = value

    @property
    def forceKeep(self):
        return self._forceKeep

    @forceKeep.setter
    def forceKeep(self, value):
        if not isinstance(value, bool):
            raise ValueError("to_xarray must be boolean")
        self._forceKeep = value

    @property
    def singleSimulation(self):
        return SingleSimulation

    def __init__(self, projectName, filesDirectory=None, to_xarray=True, to_database=False, forceKeep=False,connectionName=None):
        """
            Initializes the LSM.old toolkit

        Parameters
        ----------

        projectName: str
            The name of the project that contains the

        filesDirectory: str
            The directory to save the simulations.old.


        to_xarray: bool
            Save the simulation results into xarray or not

        to_database: bool
            Save the simulation run in the database or not

        forceKeep: bool
            If to_xarray is true, determine wehter to keep the original files.
            if False, removes the Lagrnagian files.

        """
        super().__init__(projectName=projectName, toolkitName="LSM.old", filesDirectory=filesDirectory,connectionName=connectionName)

        self.to_xarray = to_xarray
        self.to_database = to_database
        self.forceKeep = forceKeep
        self._analysis = analysis(self)

    def getTemplates(self, **query):
        """
            Get a list of Template objects that fulfill the query

        Returns
        -------
            List of LSMTemplates

        """
        docList = self.getDataSourceDocumentsList(**query)
        return [LSMTemplate(doc,self) for doc in docList]

    def getTemplateByName(self,templateName,templateVersion=None):
        """
            Retrieve the template by its name.

        Parameters
        ----------

        templateName: str
                The name of the template.
        templateVersion: str
                The name of the template.
        Returns
        -------
            The template by the name
        """
        doc = self.getDataSourceDocument(datasourceName=templateName, version=templateVersion)
        return LSMTemplate(doc,self) if doc is not None else None

    def getTemplatesTable(self, **query):
        """
            :list the template parameters that fulfil the query
        :param query:
        :return:
        """
        docList = self.getDataSourceDocumentsList(**query)
        if len(docList) > 0:
            descList = [doc.desc.copy() for doc in docList]
            for (i, desc) in enumerate(descList):
                desc.update({'id':docList[i].id})
                desc.update({'projectName': docList[i].projectName})

            params_df_list = [pandas.DataFrame(desc.pop('params'), index=[0]) for desc in descList]
            desc_df_list = [pandas.DataFrame(desc, index=[0]) for desc in descList]
            df_list = [desc.join(params) for (desc,params) in product(desc_df_list, params_df_list)]
            ret = pandas.concat(df_list,ignore_index=True,sort=False)
        else:
            ret = pandas.DataFrame()
        return ret


    def loadData(self,fileNameOrData,saveMode=toolkit.TOOLKIT_SAVEMODE_FILEANDDB,**kwargs):
        """
            Load a template object. Possibly saves to the DB.

        Parameters
        ----------
        fileNameOrData: str or JSON str or a JSON object.

                If str , a file or an JSON str that describes a template.

        saveMode: str
                Can be either:

                    - TOOLKIT_SAVEMODE_NOSAVE   : Just create template object


                    - TOOLKIT_SAVEMODE_FILEANDDB : Creates the template and store to the DB as a source.
                                                    Raise exception if the entry exists.

                    - TOOLKIT_SAVEMODE_FILEANDDB_REPLACE: Creates the template and store to the DB as a source.
                                                    Replace the entry in the DB if it exists.

        Returns
        -------
            template.LSMTemplate object

            Return the template object.
        """

        if isinstance(fileNameOrData,str):
            if os.path.isfile(fileNameOrData):
                with open(fileNameOrData) as templateFile:
                    desc = json.load(templateFile)
            else:
                raise ValueError("fileNameOrData must be a JSON template file")
        else:
            raise ValueError("fileNameOrData must be a JSON template file")

        templateName = desc['name']
        version = kwargs.get("version",None)

        doc = None
        if saveMode in [toolkit.TOOLKIT_SAVEMODE_FILEANDDB,toolkit.TOOLKIT_SAVEMODE_FILEANDDB_REPLACE]:

            doc = self.getDataSourceDocument(templateName)

            if doc is None:

                self.addDataSource(dataSourceName=templateName,
                                   resource=fileNameOrData,
                                   dataFormat=datalayer.datatypes.STRING,
                                   version=version,
                                   **kwargs)

            else:
                if  (saveMode == toolkit.TOOLKIT_SAVEMODE_FILEANDDB):
                    raise ValueError(f"Template {templateName} already exists in the DB")

                doc.resource = fileNameOrData
                doc.desc = kwargs
                doc.desc['version'] = version

        if doc is None:
            doc = datalayer.nonDBMetadataFrame(data=fileNameOrData,**kwargs)

        return LSMTemplate(doc,self)

    def getSimulations(self,simulationName=None,unitsTemplateVersion="v4-general", **query):
        """
        get a list of SingleSimulation objects that fulfill the query
        :param query:
        :return:
        """
        template = self.getTemplateByName(unitsTemplateVersion)

        if 'units' in template._document['desc']:
            for key in template._document['desc']["units"].keys():
                if key in query.keys():
                    query_item= query[key]
                    if isinstance(query_item, Unum):
                        query[key] = query_item.asNumber(eval(template._document['desc']["units"][key]))
                    elif isinstance(query_item, Quantity):
                        query[key] = query_item.m_as(template._document['desc']["units"][key])
                    else:
                        raise ValueError(f"query must use either pint or unum to specify units, currently type({query[key]})={type(query[key])}")
        else:
            query = ConfigurationToJSON(query)
        print(query)
        queryWithParams = {}
        for key in query.keys():
            queryWithParams[f"params__{key}"] = query[key]

        if simulationName is not None:
            queryWithParams["simulationName"] = simulationName

        docList = self.getSimulationsDocuments(type=LSMTemplate("",self).doctype_simulation, **queryWithParams)
        retList = []
        for doc in docList:
            try:
                retList.append(SingleSimulation(doc))
            except:
                print(f"Warning: could not find data with the following document: {doc.asDict()}")

        return retList

    def getSimulationsList(self, wideFormat=False, **query):
        """
            List the Simulation parameters that fulfil the query
        :param query:
        :return:
        """
        docList = self.getSimulationsDocuments(type=LSMTemplate("",self).doctype_simulation, **query)
        descList = [doc.desc.copy() for doc in docList]
        for (i, desc) in enumerate(descList):
            desc.update({'id':docList[i].id})
        params_df_list = [pandas.DataFrame(desc.pop('params'), index=[0]) for desc in descList]
        params_df_list = [df.rename(columns=dict([(x,"params__%s"%x) for x in df.columns])) for df in params_df_list]
        desc_df_list = [pandas.DataFrame(desc, index=[0]) for desc in descList]
        df_list = [desc.join(params) for (desc,params) in product(desc_df_list, params_df_list)]
        new_df_list = []
        for df in df_list:
            id = df['id'][0]
            new_df = df.copy().drop(columns=['id']).melt()
            new_df.index = [id]*len(new_df)
            new_df_list.append(new_df)
        try:
            df = pandas.concat(new_df_list)
            if wideFormat:
                return df.pivot(columns='variable', values='value')
            else:
                return df
        except ValueError:
            raise FileNotFoundError('No simulations.old found')


    def prepareSlurmLSMExecution(self,baseParameters,
                              jsonVariations,
                              templateName,
                              filesDirectory=None,
                              to_xarray=True,
                              to_database=False,
                              forceKeep=False,
                              connectionName=None,
                              stations=None,
                              topography=None,
                              depositionRates=None,
                              canopy=None,
                              saveMode=toolkit.TOOLKIT_SAVEMODE_FILEANDDB,
                              slurmExecutionFileName="submit_all.sh",
                              caseListFileName="cases.txt",
                              allocateProcessorsPerRun=None,
                              memoryInGB=None,
                              jobName="lsm_cases",
                              exclusive=False):
        """
            Creates a slurm setup to run many LSM simulations against specific variation

        Parameters
        ----------
        baseParameters : dict
                base parameters
        jsonVariations :
                Variation file (using the jsonutils variations) format to apply on the base paramaters
        slurmExecutionFileName: str
                The name of the bash file to create with the slurm batch run
        caseListFileName:
                The batchfile uses case file name, so add it.
        allocateProcessorsPerRun : int | None
                How many nodes(currently just threads) are used per job, in slurm documentation they describe "This option advises ... that job steps run ... will launch a maximum of number tasks and to provide for sufficient resources."
        memoryInGB : str | int | none
                Should memory be limited, affect depends on configuration(might kill if memory use is exceeded)
        jobName : bool
                name for slurm job
        exclusive : bool
                Should slurm run one job at a time on a GRES(Generic RESource in our case CPUs)
        Returns
        -------

        """
        logger = get_classMethod_logger(self,"prepareSlurmWorkflowExecution")
        simList = ""
        if not isinstance(baseParameters, dict):
            logger.error("Slurm preparation can only handle dictionary of parameters to run variations")

        if isinstance(jsonVariations, str):
            logger.info(f"Assuming {jsonVariations} is path to variations file")
            with open(jsonVariations, 'r') as variationsFile:
                jsonVariations = json.load(variationsFile)
        elif not isinstance(jsonVariations, dict):
            logger.error("Slurm preparation only supports json variation input as path or dict")

        SIMULATIONS_SCRIPT_DIR_NAME= "simulationsScripts"
        STATIONS_PATH = "stations.parquet"
        RUN_SIM_FILE_NAME = "run_sim.py"

        simulations_scripts_dir = Path(self.filesDirectory,SIMULATIONS_SCRIPT_DIR_NAME)
        os.makedirs(simulations_scripts_dir, exist_ok=True)
        if stations is not None:
            stations.to_parquet(simulations_scripts_dir / STATIONS_PATH)
        for i, jsonConfig in enumerate(JSONVariations(baseParameters, jsonVariations)):
            jsonConfig = JSONToConfiguration(jsonConfig)
            jsonConfig = LSMTemplate.prepareParams(desc=None, paramsToPrepare=jsonConfig)
            simName = f"LSM_Simulation_{i}"
            simSavePath = simulations_scripts_dir/ (simName+".py")

            os.makedirs(simulations_scripts_dir / simName, exist_ok=True)
            if isinstance(topography, str):
                topography = f'"{topography}"'
            if isinstance(canopy, str):
                canopy = f'"{canopy}"'
            if isinstance(topography, str):
                topography = f'"{topography}"'
            read_stations_line = f'stations = pandas.read_parquet("'+STATIONS_PATH+'")'
            connectionNameParam = f'"{connectionName}"' if connectionName is not None else "None" 

            sim_script = f"""
from hera import toolkitHome, ToolkitHome
from hera.simulations.LSM.toolkit import LSMToolkit
import pandas
old_lsm_toolkit = toolkitHome.getToolkit(toolkitName=ToolkitHome.LSM, projectName="{self.projectName}", filesDirectory={"None" if filesDirectory is None else filesDirectory}, to_xarray={to_xarray}, to_database={to_database}, forceKeep={forceKeep}, connectionName={connectionNameParam})

lsm_template = old_lsm_toolkit.getTemplates(template="{templateName}")[0]
params={jsonConfig}
{read_stations_line if stations is not None else ""}
lsm_template.run(topography={topography}, stations={"None" if stations is None else "stations"},canopy={"None" if canopy is None else canopy},depositionRates={"None" if depositionRates is None else depositionRates}, saveMode="{saveMode}",simulationName="{simName}",**params)
"""
            with open(simSavePath, "w") as f:
                f.write(sim_script)

            simList += f"{simName}\n"


        script =f"""
cd "$dir"
python {RUN_SIM_FILE_NAME}
        """
        caseListFilePath = simulations_scripts_dir/ caseListFileName

        logger.info(f"Writing simulation list file")

        with open(caseListFilePath,"w") as outputFile:
            outputFile.write(simList)
        slurm.prepareSlurmScriptExecution(script=script,
                                          slurmExecutionFilePath=simulations_scripts_dir/slurmExecutionFileName,
                                          jobDirListFilePath=caseListFilePath,
                                          allocateProcessorsPerRun=allocateProcessorsPerRun,
                                          memoryInGB=memoryInGB,
                                          jobName=jobName,
                                          quiet=False,
                                          exclusive=exclusive)

__init__(projectName, filesDirectory=None, to_xarray=True, to_database=False, forceKeep=False, connectionName=None)

Initializes the LSM.old toolkit

Parameters:

Name Type Description Default
projectName

The name of the project that contains the

required
filesDirectory

The directory to save the simulations.old.

None
to_xarray

Save the simulation results into xarray or not

True
to_database

Save the simulation run in the database or not

False
forceKeep

If to_xarray is true, determine wehter to keep the original files. if False, removes the Lagrnagian files.

False
Source code in hera/simulations/LSM/toolkit.py
def __init__(self, projectName, filesDirectory=None, to_xarray=True, to_database=False, forceKeep=False,connectionName=None):
    """
        Initializes the LSM.old toolkit

    Parameters
    ----------

    projectName: str
        The name of the project that contains the

    filesDirectory: str
        The directory to save the simulations.old.


    to_xarray: bool
        Save the simulation results into xarray or not

    to_database: bool
        Save the simulation run in the database or not

    forceKeep: bool
        If to_xarray is true, determine wehter to keep the original files.
        if False, removes the Lagrnagian files.

    """
    super().__init__(projectName=projectName, toolkitName="LSM.old", filesDirectory=filesDirectory,connectionName=connectionName)

    self.to_xarray = to_xarray
    self.to_database = to_database
    self.forceKeep = forceKeep
    self._analysis = analysis(self)

getTemplates(**query)

Get a list of Template objects that fulfill the query

Returns:

Type Description
List of LSMTemplates
Source code in hera/simulations/LSM/toolkit.py
def getTemplates(self, **query):
    """
        Get a list of Template objects that fulfill the query

    Returns
    -------
        List of LSMTemplates

    """
    docList = self.getDataSourceDocumentsList(**query)
    return [LSMTemplate(doc,self) for doc in docList]

getTemplateByName(templateName, templateVersion=None)

Retrieve the template by its name.

Parameters:

Name Type Description Default
templateName
The name of the template.
required
templateVersion
The name of the template.
None

Returns:

Type Description
The template by the name
Source code in hera/simulations/LSM/toolkit.py
def getTemplateByName(self,templateName,templateVersion=None):
    """
        Retrieve the template by its name.

    Parameters
    ----------

    templateName: str
            The name of the template.
    templateVersion: str
            The name of the template.
    Returns
    -------
        The template by the name
    """
    doc = self.getDataSourceDocument(datasourceName=templateName, version=templateVersion)
    return LSMTemplate(doc,self) if doc is not None else None

getTemplatesTable(**query)

:list the template parameters that fulfil the query

:param query: :return:

Source code in hera/simulations/LSM/toolkit.py
def getTemplatesTable(self, **query):
    """
        :list the template parameters that fulfil the query
    :param query:
    :return:
    """
    docList = self.getDataSourceDocumentsList(**query)
    if len(docList) > 0:
        descList = [doc.desc.copy() for doc in docList]
        for (i, desc) in enumerate(descList):
            desc.update({'id':docList[i].id})
            desc.update({'projectName': docList[i].projectName})

        params_df_list = [pandas.DataFrame(desc.pop('params'), index=[0]) for desc in descList]
        desc_df_list = [pandas.DataFrame(desc, index=[0]) for desc in descList]
        df_list = [desc.join(params) for (desc,params) in product(desc_df_list, params_df_list)]
        ret = pandas.concat(df_list,ignore_index=True,sort=False)
    else:
        ret = pandas.DataFrame()
    return ret

loadData(fileNameOrData, saveMode=toolkit.TOOLKIT_SAVEMODE_FILEANDDB, **kwargs)

Load a template object. Possibly saves to the DB.

Parameters:

Name Type Description Default
fileNameOrData
If str , a file or an JSON str that describes a template.
required
saveMode
Can be either:

    - TOOLKIT_SAVEMODE_NOSAVE   : Just create template object


    - TOOLKIT_SAVEMODE_FILEANDDB : Creates the template and store to the DB as a source.
                                    Raise exception if the entry exists.

    - TOOLKIT_SAVEMODE_FILEANDDB_REPLACE: Creates the template and store to the DB as a source.
                                    Replace the entry in the DB if it exists.
TOOLKIT_SAVEMODE_FILEANDDB

Returns:

Type Description
template.LSMTemplate object

Return the template object.

Source code in hera/simulations/LSM/toolkit.py
def loadData(self,fileNameOrData,saveMode=toolkit.TOOLKIT_SAVEMODE_FILEANDDB,**kwargs):
    """
        Load a template object. Possibly saves to the DB.

    Parameters
    ----------
    fileNameOrData: str or JSON str or a JSON object.

            If str , a file or an JSON str that describes a template.

    saveMode: str
            Can be either:

                - TOOLKIT_SAVEMODE_NOSAVE   : Just create template object


                - TOOLKIT_SAVEMODE_FILEANDDB : Creates the template and store to the DB as a source.
                                                Raise exception if the entry exists.

                - TOOLKIT_SAVEMODE_FILEANDDB_REPLACE: Creates the template and store to the DB as a source.
                                                Replace the entry in the DB if it exists.

    Returns
    -------
        template.LSMTemplate object

        Return the template object.
    """

    if isinstance(fileNameOrData,str):
        if os.path.isfile(fileNameOrData):
            with open(fileNameOrData) as templateFile:
                desc = json.load(templateFile)
        else:
            raise ValueError("fileNameOrData must be a JSON template file")
    else:
        raise ValueError("fileNameOrData must be a JSON template file")

    templateName = desc['name']
    version = kwargs.get("version",None)

    doc = None
    if saveMode in [toolkit.TOOLKIT_SAVEMODE_FILEANDDB,toolkit.TOOLKIT_SAVEMODE_FILEANDDB_REPLACE]:

        doc = self.getDataSourceDocument(templateName)

        if doc is None:

            self.addDataSource(dataSourceName=templateName,
                               resource=fileNameOrData,
                               dataFormat=datalayer.datatypes.STRING,
                               version=version,
                               **kwargs)

        else:
            if  (saveMode == toolkit.TOOLKIT_SAVEMODE_FILEANDDB):
                raise ValueError(f"Template {templateName} already exists in the DB")

            doc.resource = fileNameOrData
            doc.desc = kwargs
            doc.desc['version'] = version

    if doc is None:
        doc = datalayer.nonDBMetadataFrame(data=fileNameOrData,**kwargs)

    return LSMTemplate(doc,self)

getSimulations(simulationName=None, unitsTemplateVersion='v4-general', **query)

get a list of SingleSimulation objects that fulfill the query :param query: :return:

Source code in hera/simulations/LSM/toolkit.py
def getSimulations(self,simulationName=None,unitsTemplateVersion="v4-general", **query):
    """
    get a list of SingleSimulation objects that fulfill the query
    :param query:
    :return:
    """
    template = self.getTemplateByName(unitsTemplateVersion)

    if 'units' in template._document['desc']:
        for key in template._document['desc']["units"].keys():
            if key in query.keys():
                query_item= query[key]
                if isinstance(query_item, Unum):
                    query[key] = query_item.asNumber(eval(template._document['desc']["units"][key]))
                elif isinstance(query_item, Quantity):
                    query[key] = query_item.m_as(template._document['desc']["units"][key])
                else:
                    raise ValueError(f"query must use either pint or unum to specify units, currently type({query[key]})={type(query[key])}")
    else:
        query = ConfigurationToJSON(query)
    print(query)
    queryWithParams = {}
    for key in query.keys():
        queryWithParams[f"params__{key}"] = query[key]

    if simulationName is not None:
        queryWithParams["simulationName"] = simulationName

    docList = self.getSimulationsDocuments(type=LSMTemplate("",self).doctype_simulation, **queryWithParams)
    retList = []
    for doc in docList:
        try:
            retList.append(SingleSimulation(doc))
        except:
            print(f"Warning: could not find data with the following document: {doc.asDict()}")

    return retList

getSimulationsList(wideFormat=False, **query)

List the Simulation parameters that fulfil the query

:param query: :return:

Source code in hera/simulations/LSM/toolkit.py
def getSimulationsList(self, wideFormat=False, **query):
    """
        List the Simulation parameters that fulfil the query
    :param query:
    :return:
    """
    docList = self.getSimulationsDocuments(type=LSMTemplate("",self).doctype_simulation, **query)
    descList = [doc.desc.copy() for doc in docList]
    for (i, desc) in enumerate(descList):
        desc.update({'id':docList[i].id})
    params_df_list = [pandas.DataFrame(desc.pop('params'), index=[0]) for desc in descList]
    params_df_list = [df.rename(columns=dict([(x,"params__%s"%x) for x in df.columns])) for df in params_df_list]
    desc_df_list = [pandas.DataFrame(desc, index=[0]) for desc in descList]
    df_list = [desc.join(params) for (desc,params) in product(desc_df_list, params_df_list)]
    new_df_list = []
    for df in df_list:
        id = df['id'][0]
        new_df = df.copy().drop(columns=['id']).melt()
        new_df.index = [id]*len(new_df)
        new_df_list.append(new_df)
    try:
        df = pandas.concat(new_df_list)
        if wideFormat:
            return df.pivot(columns='variable', values='value')
        else:
            return df
    except ValueError:
        raise FileNotFoundError('No simulations.old found')

prepareSlurmLSMExecution(baseParameters, jsonVariations, templateName, filesDirectory=None, to_xarray=True, to_database=False, forceKeep=False, connectionName=None, stations=None, topography=None, depositionRates=None, canopy=None, saveMode=toolkit.TOOLKIT_SAVEMODE_FILEANDDB, slurmExecutionFileName='submit_all.sh', caseListFileName='cases.txt', allocateProcessorsPerRun=None, memoryInGB=None, jobName='lsm_cases', exclusive=False)

Creates a slurm setup to run many LSM simulations against specific variation

Parameters:

Name Type Description Default
baseParameters dict
base parameters
required
jsonVariations
Variation file (using the jsonutils variations) format to apply on the base paramaters
required
slurmExecutionFileName
The name of the bash file to create with the slurm batch run
'submit_all.sh'
caseListFileName
The batchfile uses case file name, so add it.
'cases.txt'
allocateProcessorsPerRun int | None
How many nodes(currently just threads) are used per job, in slurm documentation they describe "This option advises ... that job steps run ... will launch a maximum of number tasks and to provide for sufficient resources."
None
memoryInGB str | int | none
Should memory be limited, affect depends on configuration(might kill if memory use is exceeded)
None
jobName bool
name for slurm job
'lsm_cases'
exclusive bool
Should slurm run one job at a time on a GRES(Generic RESource in our case CPUs)
False
Source code in hera/simulations/LSM/toolkit.py
    def prepareSlurmLSMExecution(self,baseParameters,
                              jsonVariations,
                              templateName,
                              filesDirectory=None,
                              to_xarray=True,
                              to_database=False,
                              forceKeep=False,
                              connectionName=None,
                              stations=None,
                              topography=None,
                              depositionRates=None,
                              canopy=None,
                              saveMode=toolkit.TOOLKIT_SAVEMODE_FILEANDDB,
                              slurmExecutionFileName="submit_all.sh",
                              caseListFileName="cases.txt",
                              allocateProcessorsPerRun=None,
                              memoryInGB=None,
                              jobName="lsm_cases",
                              exclusive=False):
        """
            Creates a slurm setup to run many LSM simulations against specific variation

        Parameters
        ----------
        baseParameters : dict
                base parameters
        jsonVariations :
                Variation file (using the jsonutils variations) format to apply on the base paramaters
        slurmExecutionFileName: str
                The name of the bash file to create with the slurm batch run
        caseListFileName:
                The batchfile uses case file name, so add it.
        allocateProcessorsPerRun : int | None
                How many nodes(currently just threads) are used per job, in slurm documentation they describe "This option advises ... that job steps run ... will launch a maximum of number tasks and to provide for sufficient resources."
        memoryInGB : str | int | none
                Should memory be limited, affect depends on configuration(might kill if memory use is exceeded)
        jobName : bool
                name for slurm job
        exclusive : bool
                Should slurm run one job at a time on a GRES(Generic RESource in our case CPUs)
        Returns
        -------

        """
        logger = get_classMethod_logger(self,"prepareSlurmWorkflowExecution")
        simList = ""
        if not isinstance(baseParameters, dict):
            logger.error("Slurm preparation can only handle dictionary of parameters to run variations")

        if isinstance(jsonVariations, str):
            logger.info(f"Assuming {jsonVariations} is path to variations file")
            with open(jsonVariations, 'r') as variationsFile:
                jsonVariations = json.load(variationsFile)
        elif not isinstance(jsonVariations, dict):
            logger.error("Slurm preparation only supports json variation input as path or dict")

        SIMULATIONS_SCRIPT_DIR_NAME= "simulationsScripts"
        STATIONS_PATH = "stations.parquet"
        RUN_SIM_FILE_NAME = "run_sim.py"

        simulations_scripts_dir = Path(self.filesDirectory,SIMULATIONS_SCRIPT_DIR_NAME)
        os.makedirs(simulations_scripts_dir, exist_ok=True)
        if stations is not None:
            stations.to_parquet(simulations_scripts_dir / STATIONS_PATH)
        for i, jsonConfig in enumerate(JSONVariations(baseParameters, jsonVariations)):
            jsonConfig = JSONToConfiguration(jsonConfig)
            jsonConfig = LSMTemplate.prepareParams(desc=None, paramsToPrepare=jsonConfig)
            simName = f"LSM_Simulation_{i}"
            simSavePath = simulations_scripts_dir/ (simName+".py")

            os.makedirs(simulations_scripts_dir / simName, exist_ok=True)
            if isinstance(topography, str):
                topography = f'"{topography}"'
            if isinstance(canopy, str):
                canopy = f'"{canopy}"'
            if isinstance(topography, str):
                topography = f'"{topography}"'
            read_stations_line = f'stations = pandas.read_parquet("'+STATIONS_PATH+'")'
            connectionNameParam = f'"{connectionName}"' if connectionName is not None else "None" 

            sim_script = f"""
from hera import toolkitHome, ToolkitHome
from hera.simulations.LSM.toolkit import LSMToolkit
import pandas
old_lsm_toolkit = toolkitHome.getToolkit(toolkitName=ToolkitHome.LSM, projectName="{self.projectName}", filesDirectory={"None" if filesDirectory is None else filesDirectory}, to_xarray={to_xarray}, to_database={to_database}, forceKeep={forceKeep}, connectionName={connectionNameParam})

lsm_template = old_lsm_toolkit.getTemplates(template="{templateName}")[0]
params={jsonConfig}
{read_stations_line if stations is not None else ""}
lsm_template.run(topography={topography}, stations={"None" if stations is None else "stations"},canopy={"None" if canopy is None else canopy},depositionRates={"None" if depositionRates is None else depositionRates}, saveMode="{saveMode}",simulationName="{simName}",**params)
"""
            with open(simSavePath, "w") as f:
                f.write(sim_script)

            simList += f"{simName}\n"


        script =f"""
cd "$dir"
python {RUN_SIM_FILE_NAME}
        """
        caseListFilePath = simulations_scripts_dir/ caseListFileName

        logger.info(f"Writing simulation list file")

        with open(caseListFilePath,"w") as outputFile:
            outputFile.write(simList)
        slurm.prepareSlurmScriptExecution(script=script,
                                          slurmExecutionFilePath=simulations_scripts_dir/slurmExecutionFileName,
                                          jobDirListFilePath=caseListFilePath,
                                          allocateProcessorsPerRun=allocateProcessorsPerRun,
                                          memoryInGB=memoryInGB,
                                          jobName=jobName,
                                          quiet=False,
                                          exclusive=exclusive)

Wind Profile

WindProfileToolkit

hera.simulations.windProfile.toolkit.WindProfileToolkit

Bases: abstractToolkit

Source code in hera/simulations/windProfile/toolkit.py
class WindProfileToolkit(toolkit.abstractToolkit):
    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        super().__init__(projectName=projectName, toolkitName='WindProfileToolkit', filesDirectory=filesDirectory,connectionName=connectionName)
        self._presentation = presentation(dataLayer=self)

    def getWindProfile(self,xarray:xarray.DataArray, height:float, dz:float):
        """
        Returns dataframe with u and v velocities in different specified heights.

        Parameters
        ----------
        stations: pd.DataFrame
            Stations dataframe, with lon, lat, wind and direction columns. Direction should be in meteorological direction.

        xarray: xarray.DataArray
            Xarray of stations region.

        height: float
            The maximum height to calculate velocities.

        dz: float
            Step size in meters.

        Returns
        -------
            pd.DataFrame
        """

        df = pd.DataFrame()
        df['lon'] = xarray['lon'].values.flatten()
        df['lat'] = xarray['lat'].values.flatten()
        df['ws'] = xarray['ws'].values.flatten()
        df['wd'] = xarray['wd'].values.flatten()

        results = []
        fields = xarray.coords
        for _, row in df.iterrows():
            lat = row['lat']
            lon = row['lon']
            wind_speed = row['ws']
            wind_direction = row['wd']
            # z0 = df['z0']
            i,j = self._find_lat_lon_index_in_xarray(lat,lon,xarray)
            z0 = float(xarray[i,j].z0.values)

            for z in np.arange(0, height + dz, dz):
                U_star = (wind_speed * KARMAN) / np.log(z / z0)
                if 'hc' in fields:
                    hc = float(xarray[i,j].hc.values)
                    if hc > 2.0:          #Urban Area
                        if z > hc:
                            U_z = (U_star / KARMAN) * np.log(z / z0)
                        else:
                            U_hc = (U_star/KARMAN) * np.log(hc/z0)
                            U_z = U_hc * np.exp(BETA * (z - hc) / xarray[i,j].ll.values)
                    else:
                        continue
                else:                       #Non-Urban Area
                    U_z = (U_star / KARMAN) * np.log(z / z0)

                u = U_z * np.cos(np.radians(toMathematicalAngle(wind_direction)))
                v = U_z * np.sin(np.radians(toMathematicalAngle(wind_direction)))
                results.append({
                    'lat': lat,
                    'lon': lon,
                    'height': z,
                    'u': u,
                    'v': v,
                    'U_z': U_z,
                    'direction': wind_direction
                })

        return pd.DataFrame(results)

    def _find_lat_lon_index_in_xarray(self,lat,lon,xarray):
        latitudes = xarray['lat'].values
        longitudes = xarray['lon'].values

        lat_diff = np.abs(latitudes - lat)
        lon_diff = np.abs(longitudes - lon)

        total_diff = lat_diff + lon_diff
        min_index = np.unravel_index(np.argmin(total_diff), latitudes.shape)

        i, j = min_index
        return i,j

    def getSpatialWind(self,minlat,minlon,maxlat,maxlon,IMS_TOKEN,dxdy=30,inputCRS=WSG84,landcover_DataSource=None):
        landcover_tk = toolkitHome.getToolkit(toolkitHome.GIS_LANDCOVER,projectName=self.projectName)
        xarray = landcover_tk.getLandCover(minlat,minlon,maxlat,maxlon, dxdy,inputCRS=inputCRS,dataSourceName=landcover_DataSource)
        xarray = landcover_tk.getRoughnessFromLandcover(xarray,dxdy)       #For now is simple - only with dictionary
        with open(f'{os.path.dirname(os.path.abspath(__file__))}/wind_stations.json', 'r') as json_file:
            wind_stations = json.load(json_file)
        stations = [station for station in wind_stations]
        stations_with_data = self._getWindSpeedDirection(stations,IMS_TOKEN)
        return xarray,stations_with_data

    def _getStationsInRegion(self,minlon,minlat,maxlon,maxlat,inputCRS):
        min_pp = convertCRS(points=[[minlon, minlat]], inputCRS=inputCRS, outputCRS=ITM)[0]
        max_pp = convertCRS(points=[[maxlon, maxlat]], inputCRS=inputCRS, outputCRS=ITM)[0]
        with open('wind_stations.json', 'r') as json_file:
            wind_stations = json.load(json_file)
        stations_in_region = []
        for station in wind_stations:
            lat = station['attributes'][2]['value']['latitude']
            lon = station['attributes'][2]['value']['longitude']
            point_ITM = convertCRS(points=[[lon, lat]], inputCRS=WSG84, outputCRS=ITM)[0]
            if min_pp.x <= point_ITM.x and max_pp.x >= point_ITM.x and min_pp.y <= point_ITM.y and max_pp.y >= point_ITM.y:
                stations_in_region.append(station)

        return stations_in_region

    def _getWindSpeedDirection(self,stations,IMS_TOKEN):
        stations_with_data = []
        headers = {'Authorization': IMS_TOKEN['Authorization']}
        stations_datetimes = []
        for station_dict in tqdm(stations):
            lat = station_dict['attributes'][2]['value']['latitude']
            lon = station_dict['attributes'][2]['value']['longitude']
            station_id = station_dict['attributes'][0]['value']
            height = station_dict['height_above_sea']
            trials = 0
            data = None
            datetime_str = None
            while (trials < 20):
                try:
                    url = f'https://api.ims.gov.il/v1/envista/stations/{station_id}/data/latest'
                    response = requests.request('GET', url, headers=headers)
                    data = json.loads(response.text.encode('utf8'))
                    datetime_str = data['data'][0]['datetime']
                    print(f"{data['data'][0]['datetime']}, station: {station_dict['name']}")
                    break
                except:
                    trials += 1
                    # print(f"Trial {trials} for Station {station_id}")
            if data:
                for channel in data['data'][0]['channels']:
                    if channel['name'] == 'WS':
                        ws = channel['value']
                    if channel['name'] == 'WD':
                        wd = channel['value']

                if abs(ws) < 20 and abs(wd) <= 360:
                    stations_with_data.append([lat, lon, height, [ws, wd]])
                    stations_datetimes.append(datetime_str)

        datetime_objects = [datetime.fromisoformat(dt) for dt in stations_datetimes]
        max_datetime = max(datetime_objects)
        threshold = timedelta(minutes=15)
        filtered_stations = [
            stations_with_data[i] for i, dt in enumerate(datetime_objects) if (max_datetime - dt) < threshold
        ]
        return filtered_stations

    def add_interpolated_ws_wd(self,xarray,stations):
        ws_wd = xr.apply_ufunc(
            self._interpolate_wd_ws,
            xarray['lon'], xarray['lat'],xarray['elevation'],
            vectorize=True,
            kwargs={'stations_with_data': stations},
            output_core_dims=[[], []],
        )
        xarray['ws'] = ws_wd[0]
        xarray['wd'] = ws_wd[1]
        return xarray


    def _interpolate_wd_ws(self,lon,lat,elevation,stations_with_data):
        result = spatialInterpolate().interp([lat, lon, elevation + 10.0], stations_with_data)
        ws, wd = result[0], result[1]
        return float(ws), float(wd)

getWindProfile(xarray: xarray.DataArray, height: float, dz: float)

Returns dataframe with u and v velocities in different specified heights.

Parameters:

Name Type Description Default
stations

Stations dataframe, with lon, lat, wind and direction columns. Direction should be in meteorological direction.

required
xarray DataArray

Xarray of stations region.

required
height float

The maximum height to calculate velocities.

required
dz float

Step size in meters.

required

Returns:

Type Description
pd.DataFrame
Source code in hera/simulations/windProfile/toolkit.py
def getWindProfile(self,xarray:xarray.DataArray, height:float, dz:float):
    """
    Returns dataframe with u and v velocities in different specified heights.

    Parameters
    ----------
    stations: pd.DataFrame
        Stations dataframe, with lon, lat, wind and direction columns. Direction should be in meteorological direction.

    xarray: xarray.DataArray
        Xarray of stations region.

    height: float
        The maximum height to calculate velocities.

    dz: float
        Step size in meters.

    Returns
    -------
        pd.DataFrame
    """

    df = pd.DataFrame()
    df['lon'] = xarray['lon'].values.flatten()
    df['lat'] = xarray['lat'].values.flatten()
    df['ws'] = xarray['ws'].values.flatten()
    df['wd'] = xarray['wd'].values.flatten()

    results = []
    fields = xarray.coords
    for _, row in df.iterrows():
        lat = row['lat']
        lon = row['lon']
        wind_speed = row['ws']
        wind_direction = row['wd']
        # z0 = df['z0']
        i,j = self._find_lat_lon_index_in_xarray(lat,lon,xarray)
        z0 = float(xarray[i,j].z0.values)

        for z in np.arange(0, height + dz, dz):
            U_star = (wind_speed * KARMAN) / np.log(z / z0)
            if 'hc' in fields:
                hc = float(xarray[i,j].hc.values)
                if hc > 2.0:          #Urban Area
                    if z > hc:
                        U_z = (U_star / KARMAN) * np.log(z / z0)
                    else:
                        U_hc = (U_star/KARMAN) * np.log(hc/z0)
                        U_z = U_hc * np.exp(BETA * (z - hc) / xarray[i,j].ll.values)
                else:
                    continue
            else:                       #Non-Urban Area
                U_z = (U_star / KARMAN) * np.log(z / z0)

            u = U_z * np.cos(np.radians(toMathematicalAngle(wind_direction)))
            v = U_z * np.sin(np.radians(toMathematicalAngle(wind_direction)))
            results.append({
                'lat': lat,
                'lon': lon,
                'height': z,
                'u': u,
                'v': v,
                'U_z': U_z,
                'direction': wind_direction
            })

    return pd.DataFrame(results)

Gaussian Dispersion

gaussianToolkit

hera.simulations.gaussian.toolkit.gaussianToolkit

Bases: abstractToolkit

Source code in hera/simulations/gaussian/toolkit.py
class gaussianToolkit(abstractToolkit):

    _sigmaDict = None

    def __init__(self, projectName: str, filesDirectory: str = None, connectionName=None):
        """
            Initializes the toolkit
        Parameters
        ----------
        projectName
        filesDirectory
        """
        super().__init__(projectName=projectName, toolkitName="gaussianToolkit", filesDirectory=filesDirectory, connectionName=connectionName)
        self._sigmaDict = dict(briggsRural=BriggsRural)

    def getSigmaType(self,sigmaName):
        """

        Parameters
        ----------
        sigmaName

        Returns
        -------

        """
        try:
            sigmaCls = self._sigmaDict[sigmaName]
        except KeyError:
            err = f"The type {sigmaName} is not found. Must be one of {','.join(self.listSigmaTypes())}"
            raise ValueError(err)
        return sigmaCls()

    def listSigmaTypes(self):
        """
            Print the list of sigma types
        Returns
        -------


        """
        return [x for x in self._sigmaDict.keys()]


    def getMeteorologyFromU10(self, u10, inversion, verticalProfileType="log", temperature=20*celsius, stability="D",
                              z0=0.1*m, ustar=0.3*m/s, skinSurfaceTemperature=35*celsius):
        return MeteorologyFactory().getMeteorologyFromU10(u10=u10, inversion=inversion, verticalProfileType=verticalProfileType,
                    temperature=temperature, stability=stability, z0=z0, ustar=ustar, skinSurfaceTemperature=skinSurfaceTemperature)


    def getMeteorologyFromURefHeight(self, u, refHeight, inversion, verticalProfileType="log", temperature=20*celsius, stability="D",
                              z0=0.1*m, ustar=0.3*m/s, skinSurfaceTemperature=35*celsius):
        return MeteorologyFactory().getMeteorologyFromURefHeight(u=u, refHeight=refHeight,  inversion=inversion,
                    verticalProfileType=verticalProfileType, temperature=temperature, stability=stability, z0=z0,
                    ustar=ustar,skinSurfaceTemperature=skinSurfaceTemperature)


    def getGasCloud(self, sourceQ, sourceHeight, initialCloudSize, sigmaTypeName="briggsRural"):
        """

        Parameters
        ----------
        sourceQ : unum, method
            If unum:
                The unit determine the release time.
                [mass] - Instantaneous
                [mass/time] - Continuous
            else
                Continuous (not implementaed yet.)

        sourceHeight : unum
        initialCloudSize : 3-touple unum, the sigmas in each axis.
        sigmaTypeName : Name of the sigma type, for example from Briggs, rural/urban.

        Returns
        -------
        An instance of the class gadCloud

        """
        sigmaType = self.getSigmaType(sigmaTypeName)
        gascloud = abstractGasCloud.createGasCloud(sourceQ=sourceQ,sourceHeight=sourceHeight,initialCloudSize=initialCloudSize,sigmaType=sigmaType)
        return gascloud

__init__(projectName: str, filesDirectory: str = None, connectionName=None)

Initializes the toolkit

Parameters:

Name Type Description Default
projectName str
required
filesDirectory str
None
Source code in hera/simulations/gaussian/toolkit.py
def __init__(self, projectName: str, filesDirectory: str = None, connectionName=None):
    """
        Initializes the toolkit
    Parameters
    ----------
    projectName
    filesDirectory
    """
    super().__init__(projectName=projectName, toolkitName="gaussianToolkit", filesDirectory=filesDirectory, connectionName=connectionName)
    self._sigmaDict = dict(briggsRural=BriggsRural)

getSigmaType(sigmaName)

Parameters:

Name Type Description Default
sigmaName
required
Source code in hera/simulations/gaussian/toolkit.py
def getSigmaType(self,sigmaName):
    """

    Parameters
    ----------
    sigmaName

    Returns
    -------

    """
    try:
        sigmaCls = self._sigmaDict[sigmaName]
    except KeyError:
        err = f"The type {sigmaName} is not found. Must be one of {','.join(self.listSigmaTypes())}"
        raise ValueError(err)
    return sigmaCls()

listSigmaTypes()

Print the list of sigma types
Source code in hera/simulations/gaussian/toolkit.py
def listSigmaTypes(self):
    """
        Print the list of sigma types
    Returns
    -------


    """
    return [x for x in self._sigmaDict.keys()]

getGasCloud(sourceQ, sourceHeight, initialCloudSize, sigmaTypeName='briggsRural')

Parameters:

Name Type Description Default
sourceQ (unum, method)

If unum: The unit determine the release time. [mass] - Instantaneous [mass/time] - Continuous else Continuous (not implementaed yet.)

required
sourceHeight unum
required
initialCloudSize 3-touple unum, the sigmas in each axis.
required
sigmaTypeName Name of the sigma type, for example from Briggs, rural/urban.
'briggsRural'

Returns:

Type Description
An instance of the class gadCloud
Source code in hera/simulations/gaussian/toolkit.py
def getGasCloud(self, sourceQ, sourceHeight, initialCloudSize, sigmaTypeName="briggsRural"):
    """

    Parameters
    ----------
    sourceQ : unum, method
        If unum:
            The unit determine the release time.
            [mass] - Instantaneous
            [mass/time] - Continuous
        else
            Continuous (not implementaed yet.)

    sourceHeight : unum
    initialCloudSize : 3-touple unum, the sigmas in each axis.
    sigmaTypeName : Name of the sigma type, for example from Briggs, rural/urban.

    Returns
    -------
    An instance of the class gadCloud

    """
    sigmaType = self.getSigmaType(sigmaTypeName)
    gascloud = abstractGasCloud.createGasCloud(sourceQ=sourceQ,sourceHeight=sourceHeight,initialCloudSize=initialCloudSize,sigmaType=sigmaType)
    return gascloud

Workflow

hermesWorkflowToolkit

hera.simulations.hermesWorkflowToolkit.hermesWorkflowToolkit

Bases: abstractToolkit

Manages the hermes worflows:

1. Checks if they are in the DB.
2. create a new name to them
3. allows simple deletion
4. allows simple comparison.
5. retrieve by initial name.
Source code in hera/simulations/hermesWorkflowToolkit.py
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
class hermesWorkflowToolkit(abstractToolkit):
    """
        Manages the hermes worflows:

            1. Checks if they are in the DB.
            2. create a new name to them
            3. allows simple deletion
            4. allows simple comparison.
            5. retrieve by initial name.
    """
    DESC_GROUPNAME = "groupName"
    DESC_GROUPID = "groupID"
    DESC_WORKFLOWNAME = "workflowName"
    DESC_PARAMETERS = "parameters"

    DOCTYPE_WORKFLOW = "hermesWorkflow"

    DOCKIND_CACHE = "Cache"
    DOCKIND_SIMULATIONS = "Simulations"

    def __init__(self, projectName: str, filesDirectory: str = None, toolkitName: str = "hermesWorkflowToolkit", connectionName=None):
        """
            Initializes the workflow toolkit.

        Parameters
        ----------
        projectName: str
            The project that the workflow will be used i.

        filesDirectory : str
            The directory to write all the Workflow and the outputs. default is current directory.
        """
        super().__init__(projectName=projectName,
                         filesDirectory=filesDirectory,
                         toolkitName=toolkitName,
                         connectionName=connectionName)

        # ## Create the simulationType->object map
        # self._simulationTypeMap = {
        #                 WorkflowTypes.WORKFLOW.value : "hermes.workflow",
        #                  WorkflowTypes.OF_DISPERSION.value  : "hera.simulations.openFoam.datalayer.hermesWorkflow.Workflow_Dispersion",
        #                  WorkflowTypes.OF_FLOWFIELD.value  : "hera.simulations.openFoam.datalayer.hermesWorkflow.Workflow_Flow"
        # }

    def listHermesSolverTemplates(self, solverName):
        """
            Returns a list of all the templates that were loaded for that specific solver.

        Parameters
        ----------
        solverName : str


        Returns
        -------

        """
        retList = []
        for doc in self.getDataSourceDocumentsList(solver=solverName):
            data = dict(doc.desc)  # ['desc'])
            retList.append(data)

        if len(retList) > 0:
            return pandas.DataFrame(retList).set_index("name")
        else:
            return pandas.DataFrame()

    def getHermesFlowTemplate(self, hermesFlowName):
        """
            Get a hermes flow template
        Parameters
        ----------
        solverName

        Returns
        -------

        """
        return self.getDataSourceData(hermesFlowName, desc__component="Flow")

    def listHermesNodesTemplates(self):
        """
            Returns a list of all the templates that were loaded for that specific solver.

        Parameters
        ----------
        solverName : str


        Returns
        -------

        """
        retList = []
        for doc in self.getDataSourceDocumentsList(desc__component="Node"):
            data = dict(doc.desc['desc'])
            data['nodeName'] = doc.desc['datasourceName']
            retList.append(data)

        if len(retList) > 0:
            return pandas.DataFrame(retList).set_index("nodeName")
        else:
            return pandas.DataFrame()

    def getHermesNodeTemplate(self, hermesNodeName):
        """
            Get a hermes flow template
        Parameters
        ----------
        solverName

        Returns
        -------

        """
        return self.getDataSourceData(hermesNodeName, desc__component="Node")

    def getHemresWorkflowFromDocument(self, documentList, returnFirst=True):
        """
            Return a hermes-workflow (or a list of hermes Workflow) to the user.

        Parameters
        ----------
        documentList : list, document
            A hera.datalayer document or a list of documents.

        returnFirst : bool
            If true, return obj of only the first iterm in the list (if it is a list).

        Returns
        -------
            hermes workflow object (or one of its derivatives).
        """
        logger = get_classMethod_logger(self, "getHemresWorkflowFromDocument")

        docList = documentList if isinstance(documentList, list) else [documentList]

        if returnFirst:
            if len(docList) == 0:
                logger.error("can't get first hermes workflow since documentList is empty")
            doc = docList[0]
            ret = self.getHermesWorkflowFromJSON(doc.desc['workflow'], name=doc.desc['workflowName'], resource=doc['resource'])
        else:
            ret = [self.getHermesWorkflowFromJSON(doc.desc['workflow'], name=doc.desc['workflowName'], resource=doc['resource']) for doc in
                   docList]

        return ret

    def getHermesWorkflowFromJSON(self, workflow: Union[dict, str], name=None, resource=None):
        """
            Creates a hermes workflow object from the JSON that is supplied.

            The JSON can be either file name, JSON string or a dictionary.

        Parameters
        ----------
        workflow: dict,str
            The

        simulationType : str
            The type of the workflow to create.

        Returns
        -------
            hermesWorkflow object.
        """
        logger = get_classMethod_logger(self, "getHermesWorkflowFromJSON")
        # loadJSON handles multiple input types: file path, JSON string, or dict.
        workFlowJSON = loadJSON(workflow)

        # Dynamic class resolution: the 'solver' field determines which workflow
        # class to instantiate. If no solver is specified, use the generic
        # hermes.workflow base class. Otherwise, resolve a solver-specific
        # subclass (e.g. hera.simulations.openFoam.OFWorkflow.workflow_simpleFoam).
        # pydoc.locate dynamically imports and returns the class.
        ky = workFlowJSON['workflow'].get('solver', None)

        if ky is None:
            hermesWFObj = pydoc.locate("hermes.workflow")
        else:
            hermesWFObj = pydoc.locate(f"hera.simulations.openFoam.OFWorkflow.workflow_{ky}")

        if hermesWFObj is None:
            err = f"The workflow type {ky} not found"
            logger.error(err)
            raise ValueError(err)

        return hermesWFObj(workFlowJSON, name=name, Resource_path=resource)

    def updateDocumentWorkflow(self, document, workflow):
        """Update a simulation document's workflow and parameters in the database.

        Parameters
        ----------
        document : MetadataFrame
            The simulation document to update.
        workflow : hermes.workflow
            The workflow object whose JSON and parameters will be stored.
        """
        document.desc['workflow'] = workflow.json
        document.desc['parameters'] = workflow.parametersJSON
        document.save()


    def getHermesWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource: Union[dict, str, list, workflow],
                                returnFirst=True, **query):
        """
                Retrieve Workflow from the DB as hermes.workflow objects (or its derivatives).

                If the workflow is string, use it as a name. If the workflow is dict,
                use it as a filter on the paramters

                If returnFirst is False, return a list with all the results of the query.
                else, returns a single hermesworkflow.

        Parameters
        ----------
        workflow: str, dict
            The filtering criteria. Either name, or the parameters of the flow.

        returnFirst : bool
            If true, return only the first object (if found several results in the DB)

        query: arguments
            Additional query criteria.

        Returns
        -------
            list (returnFirst is False)
            hermes workflow.
        """
        logger = get_classMethod_logger(self, "getHermesWorkflowFromDB")
        docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource, **query)
        if len(docList) == 0:
            logger.error(f"... not found. ")
            ret = None
        else:
            ret = self.getHemresWorkflowFromDocument(documentList=docList, returnFirst=returnFirst)
        return ret

    def getWorkflowDocumentByName(self,name,doctype=None, dockind="Simulations",**query):
        """
            Retrieve the simulation using only the name.

            For a more sophisticated retrieval (that tries to retrieve by group name and by the content of the workflow)  use getWorkflowDocumentFromDB
        Parameters
        ----------
        name : str
            The name of the workflow.

        doctype  : string
            document type.

        dockind  : string
            Whether the document is cachaed or Simulation.

        query : dict
            Additional criteria.

        Returns
        -------

        """
        logger = get_classMethod_logger(self, "getWorkflowDocumentFromDB")
        doctype = self.DOCTYPE_WORKFLOW if doctype is None else doctype
        # try to find it as a name
        mongo_crit = dictToMongoQuery(query)

        retrieve_func = getattr(self, f"get{dockind}Documents")

        logger.info(f"Searching for {name} as a name of kind {dockind}")
        docList = retrieve_func(workflowName=name, type=doctype, **mongo_crit)
        return None if len(docList) == 0 else docList[0]


    def getWorkflowDocumentFromDB(self, nameOrWorkflowFileOrJSONOrResource, doctype=None, dockind="Simulations",**query):
        """
            Tries to find item as name, workflow directory , groupname or through the resource.
            Additional queries are also applicable.

        Parameters
        ----------
        nameOrWorkflowFileOrJSONOrResource : string or dict
                The name/dict that defines the item
        doctype  : string
            document type.

        dockind  : string
            Whether the document is cachaed or Simulation.

        query : dict
            Additional criteria.
        Returns
        -------
            doc or empty list if not found.
        """
        logger = get_classMethod_logger(self, "getWorkflowDocumentFromDB")
        doctype = self.DOCTYPE_WORKFLOW if doctype is None else doctype
        mongo_crit = dictToMongoQuery(query)

        # Dynamic dispatch: retrieve from Simulations or Cache collection
        # based on dockind parameter ("Simulations" or "Cache").
        retrieve_func = getattr(self, f"get{dockind}Documents")

        if isinstance(nameOrWorkflowFileOrJSONOrResource, str):
            # Cascading search strategy for string inputs:
            # Try each identification method in order, stopping at first match.
            # 1) workflowName — exact name match (e.g. "flow_0001")
            logger.info(f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a name of kind {dockind}")
            docList = retrieve_func(workflowName=nameOrWorkflowFileOrJSONOrResource, type=doctype, **mongo_crit)
            if len(docList) == 0:
                # 2) resource — match by file/directory path
                logger.info(f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a resource of kind {dockind}.")
                docList = retrieve_func(resource=nameOrWorkflowFileOrJSONOrResource, type=doctype, **mongo_crit)
                if len(docList) == 0:
                    # 3) groupName — match all workflows in a group (e.g. "flow")
                    logger.info(
                        f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a workflow group of kind {dockind}.")
                    docList = retrieve_func(groupName=nameOrWorkflowFileOrJSONOrResource, type=doctype, **mongo_crit)
                    if len(docList) == 0:
                        # 4) JSON content — try to parse string as JSON file path
                        # or JSON string, extract parameters, and query by parameter values.
                        # This enables finding a workflow by its content rather than name.
                        logger.info(f"... not found. Try to query as a json. ")
                        try:
                            jsn = loadJSON(nameOrWorkflowFileOrJSONOrResource)
                            wf = self.getHermesWorkflowFromJSON(jsn, resource=nameOrWorkflowFileOrJSONOrResource)
                            # Flatten the workflow parameters to MongoDB query format
                            # using dictToMongoQuery with "parameters" prefix.
                            currentQuery = dictToMongoQuery(wf.parametersJSON, prefix="parameters")
                            currentQuery.update(mongo_crit)
                            docList = retrieve_func(type=self.DOCTYPE_WORKFLOW, **currentQuery)
                        except ValueError as e:

                            # logger.debug(f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a file.")
                            # if os.path.isfile(nameOrWorkflowFileOrJSONOrResource):
                            #     from ..datalayer.document import nonDBMetadataFrame
                            #     workflowName = os.path.basename(nameOrWorkflowFileOrJSONOrResource).split(".")[0]
                            #     grpTuple = workflowName.split("_")
                            #     groupName = grpTuple[0]
                            #     groupID = grpTuple[1] if len(grpTuple) > 1 else 0
                            #     hermesWF = self.getHermesWorkflowFromJSON(nameOrWorkflowFileOrJSONOrResource)
                            #     res = nonDBMetadataFrame(data=None,
                            #                              projecName=self.projectName,
                            #                              resource=os.path.join(self.FilesDirectory,
                            #                                                    nameOrWorkflowFileOrJSONOrResource),
                            #                              dataFormat=datatypes.STRING,
                            #                              type=self.DOCTYPE_WORKFLOW,
                            #                              groupName=groupName,
                            #                              groupID=groupID,
                            #                              workflowName=workflowName,
                            #                              workflowType=hermesWF.workflowType,
                            #                              workflow=hermesWF.json,
                            #                              parameters=hermesWF.parametersJSON
                            #                              )
                            #     docList = [res]
                            # else:
                            err = f"Error {e} when trying to load as JSON."
                            logger.error(err)
                            docList = []
                        except IsADirectoryError:
                            logger.debug(f"not found")
                            docList = []
                    else:
                        logger.info(f"... Found it as workflow group ")
                else:
                    logger.info(f"... Found it as resource ")
            else:
                logger.info(f"... Found it as name")

        elif isinstance(nameOrWorkflowFileOrJSONOrResource, dict) or isinstance(nameOrWorkflowFileOrJSONOrResource,
                                                                                workflow):
            if isinstance(nameOrWorkflowFileOrJSONOrResource, workflow):
                qryDict = nameOrWorkflowFileOrJSONOrResource.parametersJSON
            else:
                wrkflw = self.getHermesWorkflowFromJSON(nameOrWorkflowFileOrJSONOrResource)
                qryDict = wrkflw.parametersJSON

            logger.debug(f"Searching for {qryDict} using parameters")
            currentQuery = dictToMongoQuery(qryDict, prefix="parameters")
            currentQuery.update(mongo_crit)
            docList = retrieve_func(**currentQuery, type=self.DOCTYPE_WORKFLOW)
        else:
            docList = []

        return docList


    def getWorkflowListDocumentFromDB(self, nameOrWorkflowFileOrJSONOrResource: Union[dict, str, list, workflow],
                                      **query):
        """
            Returns the simulation document from the DB.
            The nameOrWorkflowFileOrJSONOrResource can be either group name

            Identify the simulation from :
             - Resource (thedirectory name)
             - Simulation name
             - Its workflow
             - workfolow dict.

            Return the first item that was found.

        Parameters
        ----------
        nameOrWorkflowFileOrJSONOrResource: str, dict

        Can be
             - Resource (thedirectory name)
             - Simulation name
             - Its workflow
             - workfolow dict.

        query : dict
            Additional query cireteria to the DB.

        Returns
        -------
            A document, or None if not found. .
        """

        if isinstance(nameOrWorkflowFileOrJSONOrResource, list):
            docList = []
            for simulationItem in nameOrWorkflowFileOrJSONOrResource:
                docList += self.getWorkflowDocumentFromDB(simulationItem)
        else:
            docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)

        return docList

    def getWorkflowListOfSolvers(self, solverName: str, **query):
        """
            Returns all the documents of the requested solver.

        Parameters
        ----------
        solverName : str
            The name of the solver
        query : param-list
            Additional query

        Returns
        -------
            list of documnts.
        """
        return self.getSimulationsDocuments(solver=solverName, type=self.DOCTYPE_WORKFLOW, **query)

    def getWorkflowDocumentsInGroup(self, groupName: str, **kwargs):
        """
            Return a list of all the simulations.old with the name as a prefic, and of the requested simuationType.
            Returns the list of the documents.

            If the simuationType is None use the default simuationType (WORKFLOW).

        Parameters
        ----------

        groupName : str
            The prefix name of all the runs.

        simulationType : str [optional]
            The type of the workflow.
            if None, return all.

        kwargs: additional filtering criteria.
                Use mongodb criteria.

        Returns
        -------
            list of mongo documents.

        """
        return self.getSimulationsDocuments(groupName=groupName, type=self.DOCTYPE_WORKFLOW, **kwargs)

    def findAvailableName(self, simulationGroup: str, **kwargs):
        """
            Finds the next availabe name of that prefix. The available name is the maximal ID + 1.

            we assume that the name of each run is:
                <prefix name>_<id>.


        Parameters
        ----------
        simulationGroup : str
            The simulation group

        simulationType : str
            The type of the workflow.

        **kwargs : dict
                additional fileters.
                Note that these should be the full path of the parameter in the JSON.

        Returns
        -------
            int,str
            The new ID,
            The name.
        """
        newID = self.getCounterAndAdd(f"simulations_{simulationGroup}")
        return newID, self.getworkFlowName(simulationGroup, newID)

    @staticmethod
    def getworkFlowName(baseName, flowID):
        """
            Returns the name of the flow field from the base and the
            flow id.

            The name is <base>_<id>
            where <id> is padded.

        Parameters
        ----------
        baseName : str
                The base name
        flowID: int
                the id of the name.

        Returns
        -------

        """
        formatted_number = "{0:04d}".format(flowID)
        return f"{baseName}_{formatted_number}"

    @staticmethod
    def splitWorkflowName(workflow_name:str):
        """splits workflow name into base name and the flow id.

        Returns
        -------
        base name and the flow id, both as strings

        """
        if "_" in workflow_name:
            split_name = workflow_name.split("_")
            return "".join(split_name[:-1]), split_name[-1]
        return workflow_name, None

    def addWorkflowFileInGroup(self,workflowFilePath, write_file=False):
        """adds the workflow to the database and assigning group based on the name

        Parameters
        ----------
        workflowFilePath : str
        write_file : bool

        Returns
        -------

        """
        get_classMethod_logger(self, "addWorkflowFileInGroup")
        if workflow is None:
            raise NotImplementedError("addWorkflowFileInGroup() requires the 'hermes' library, which is not installed")

        workflowFileName = os.path.basename(workflowFilePath)
        workflowName = os.path.splitext(workflowFileName)[0]
        if not os.path.abspath(workflowFilePath).startswith(self.FilesDirectory):
            raise ValueError(f"{os.path.abspath(workflowFilePath)} is not in {self.FilesDirectory}")
        doc = self.getWorkflowDocumentByName(workflowName)

        if doc is None:
            doc = self.addWorkflowToGroup(workflowFilePath, self.splitWorkflowName(workflowName)[0], writeWorkflowToFile=write_file, resource=workflowFilePath)
        return doc


    def addWorkflowToGroup(self, workflowJSON: str, groupName: str, writeWorkflowToFile:bool=False, resource=None):
        """
            Adds the workflow to the database, or updates an existing document.

            If the fullName is true:
                The groupOrFullName is the name of the simulation.
                Hence, try to get the name from the DB. If the document exists update it with the
                new data if the overwrite is True.
                If the record is not in the DB, aadd it with that name.

            If the fullName is False:
                Check if the workflow is in the DB. If it is not,
                generate a new name with the counter and add it.


        Parameters
        ----------
        workflowJSON : str
            The file name that contains the workflow.

        groupName : str
            The group to assign the workflow to.

        fullName: str
            `Treat the groupOrFullName as a full name if True.

        overwrite : bool
            If true, then update the json workflow in the document if it exists.

        writeWorkflowToFile : bool
            If true, then write the JSON file to the disk.

        Returns
        -------
            The document of the new workflow.
        """
        logger = get_classMethod_logger(self, "addWorkflowToGroup")
        if workflow is None:
            raise NotImplementedError("addWorkflowToGroup() requires the 'hermes' library, which is nor installed")

        # Idempotent add: first check if this exact workflow already exists
        # in the DB (matched by its parameter values). If found, return the
        # existing document without creating a duplicate.
        workflowData = loadJSON(workflowJSON)
        docList = self.getWorkflowDocumentFromDB(workflowData)
        if len(docList) > 0:
            logger.info(f"...Found. Returning the document.")
            doc = docList[0]
        else:
            # New workflow: generate a unique name using the group counter.
            # Naming convention: <groupName>_<padded_id> (e.g. "flow_0001").
            # The counter is per-group, stored in the project config.
            logger.info("...Not Found, adding the input to the DB")
            groupID = self.getCounterAndAdd(groupName)
            workflowName = self.getworkFlowName(groupName, groupID)
            resource = os.path.abspath(resource) if resource else (os.path.join(self.FilesDirectory, workflowName) + ".json")
            hermesWF = workflow(workflowData, Resource_path=resource)
            # Store the full workflow JSON + extracted parameters in the document.
            # The parameters are stored separately to enable efficient querying
            # via dictToMongoQuery without parsing the full workflow tree.
            doc = self.addSimulationsDocument(resource=resource,
                                              dataFormat=datatypes.STRING,
                                              type=self.DOCTYPE_WORKFLOW,
                                              desc=dict(
                                                  groupName=groupName,
                                                  groupID=groupID,
                                                  workflowName=workflowName,
                                                  solver=hermesWF.solver,
                                                  workflow=hermesWF.json,
                                                  parameters=hermesWF.parametersJSON)
                                              )

        if writeWorkflowToFile:
            workflowName = self.getworkFlowName(doc.desc['groupName'], doc.desc['groupID'])
            resource = os.path.abspath(resource) if resource else (os.path.join(self.FilesDirectory, workflowName) + ".json")
            hermesWF = workflow(doc.desc['workflow'], WD_path=self.FilesDirectory, Resource_path=resource)
            with open(os.path.join(resource), "w") as outFile:
                json.dump(hermesWF.json, outFile, indent=4)


        return doc


    def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource):
        """
            Building and Executing the workflow.

            Note that it is only executing the workflow.
            For OpenFOAM simulations, use the runOFSimulation method that will build the case
            and then run it.

            Note the procedure removes the [name]_targetFiles

        Parameters
        ----------
                nameOrWorkflowFileOrJSONOrResource: str, dict

        Can be
             - Resource (thedirectory name)
             - Simulation name
             - Its workflow
             - workfolow dict.

        build : bool [default = True]
            If true, also builds the workflow.

        Returns
        -------
            None
        """
        logger = get_classMethod_logger(self, "executeWorkflow")
        docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)

        for doc in docList:
            workflowJSON = doc.desc['workflow']
            workflowName = doc.desc['workflowName']
            logger.info(f"Processing {workflowName}")

            # Step 1: Reconstruct the hermes workflow object from the stored JSON.
            # The workflow class is resolved dynamically via pydoc.locate based on
            # the 'solver' field (generic hermes.workflow or solver-specific subclass).
            hermesWF = self.getHermesWorkflowFromJSON(workflowJSON, name=workflowName, resource=doc['resource'])

            # Step 2: Build the workflow into a Luigi task DAG.
            # hermes.build() traverses the workflow node tree, wraps each node in a
            # Luigi task, and returns the Python source code for the task module.
            logger.info(f"Building and executing the workflow {workflowName}")
            build = hermesWF.build(buildername=workflow.BUILDER_LUIGI)

            # Step 3: Write the workflow JSON and generated Python module to disk.
            # The JSON is written to the resource path; the Python module contains
            # the Luigi task definitions that will be executed.
            logger.info(f"Writing the workflow and the executer python {workflowName}")
            wfFileName = hermesWF.Resource_path
            hermesWF.write(wfFileName)

            pythonFileName = os.path.join(self.FilesDirectory, f"{workflowName}.py")
            with open(pythonFileName, "w") as outFile:
                outFile.write(build)

            # Step 4: Clean previous execution artifacts (Luigi target files).
            # Luigi uses target files to track task completion. Removing them
            # forces all tasks to re-execute from scratch.
            logger.debug(f"Removing the targetfiles and execute")
            executionfileDir = os.path.join(self.FilesDirectory, f"{workflowName}_targetFiles")
            shutil.rmtree(executionfileDir, ignore_errors=True)

            # Step 5: Execute the Luigi pipeline via command line.
            # 'finalnode_xx_0' is the terminal task that triggers the full DAG.
            # --local-scheduler avoids requiring a separate Luigi scheduler process.
            pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}")
            executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler"
            logger.debug(executionStr)
            os.system(executionStr)

            # Step 6: Clean up the generated Python module (the workflow JSON stays).
            logger.info(f"Cleaning the executer python for {workflowName}")
            os.remove(pythonFileName)


    def compareWorkflowObj(self,
                           workflowList,
                           longFormat: bool = False):
        """
            Compares the parameters of the Workflow to each other.


        Parameters
        ----------
        workflowList : list of hermes workflow objects
                The list of Workflow to compare.
        Returns
        -------

        """
        return compareJSONS(**dict([(wf.name, wf.parametersJSON) for wf in workflowList]),
                            longFormat=longFormat,changeDotToUnderscore=True)


    def compareWorkflows(self,
                         Workflow: Union[list, str],
                         longFormat: bool = False,
                         transpose: bool = False) -> Union[dict, pandas.DataFrame]:
        """
            Compares two or more hermes Workflow.

        Parameters
        ----------
        Workflow : str,list
                A single input uses it as a group name,
                a list is the list of Workflow names to compare.

        diffParams: bool
                If true display only differences.

        JSON: bool
                If true, return the results as a JSON and not pandas.DataFrame.

        Returns
        -------
            pandas.DataFrame, json (depends on the input flags).
            Return the differences between the parametrs of the requested Workflow.
        """
        logger = get_classMethod_logger(self, "compareWorkflow")
        if Workflow is None:
            raise NotImplementedError("compare() requires the 'hermes' library, which is nor installed")
        logger.info("--- Start ---")

        workflowList = []
        for workflowName in numpy.atleast_1d(Workflow):
            simulationList = self.getWorkflowListDocumentFromDB(workflowName)
            if len(simulationList) == 0:
                if os.path.isfile(workflowName):
                    workflowList.append(self.getHermesWorkflowFromJSON(workflowName, name=workflowName))
                else:
                    err = f"Cannog find simulations for {workflowName}"
                    logger.error(err)
            else:
                groupworkflowList = [workflow(simulationDoc['desc']['workflow'], WD_path=self.FilesDirectory,
                                              name=simulationDoc.desc[self.DESC_WORKFLOWNAME], resource=simulationDoc['resource']) for simulationDoc in
                                     simulationList]
                workflowList += groupworkflowList

        res = self.compareWorkflowObj(workflowList, longFormat=longFormat)

        return res.T if transpose else res


    def compareWorkflowInGroup(self, workflowGroup, longFormat=False, transpose=False):
        """
            Compares all the Workflow in the group name

            Each parameter that has different value across the workgroup is in the row, each simulation
            is in the column.

        Parameters
        ----------
        workflowGroup : str
            The group name.
        longFormat : bool
            If True, return the results in long format rather than in a wide table.
        transpose : bool
            If True return the simulation names as rows

        Returns
        -------
            Pandas with the difference in the parameter names.
        """
        simulationList = self.getWorkflowDocumentsInGroup(groupName=workflowGroup)
        workflowList = [workflow(simulationDoc['desc']['workflow'], WD_path=self.FilesDirectory,
                                 name=simulationDoc.desc[self.DESC_WORKFLOWNAME], resource=simulationDoc['resource']) for simulationDoc in simulationList]
        if len(workflowList) == 0:
            ret = None
        else:
            res = self.compareWorkflowObj(workflowList, longFormat=longFormat).T
            ret =  res.T if transpose else res

        return ret

    def deleteWorkflowInGroup(self,workflowGroup,deepDelete=False,resetCounter=True, exclude=[]):
        """
            Deletes all the workflows in the group.
        Parameters
        ----------
        workflowGroup : str
            The name of the workgroup.

        deepDelete: bool [default = False]
            If true, delete the resources.

        resetCounter : bool [default = True]
            Reset the counter of the group to 1.

        Returns
        -------

        """
        simulationList = self.getWorkflowDocumentsInGroup(groupName=workflowGroup)
        for doc in simulationList:
            if doc['desc']['workflowName'] in exclude:
                continue
            if os.path.exists(doc.resource) and deepDelete:
                shutil.rmtree(doc.resource)

            doc.delete()

        if resetCounter:
            self.setCounter(counterName=workflowGroup)




    def listWorkflows(self,
                      workflowGroup: str,
                      listNodes: bool = False,
                      listParameters: bool = False) -> Union[pandas.DataFrame, dict]:
        """
            Lists all the simulations in the simulation group (of this project).

            Allows additional filters using the simulationType.

            If parameters is not None, return the list of parameters.
            return the parameters of all the nodes if the paraleters is an empty List, or the requested parameters.
            The default behaviour is to return only the parameters that are different from each other, unless allParams
            is True.

            The output is either pandas.DataFrame (if jsonFormat is False) or a JSON (if JSON is True).

        Parameters
        ----------
        workflowGroup : str
            The name of the group

        parametersOfNodes  : list[str]
            If None, just return the names of the simulations.old. Otherwise add the parameters from the requested nodes.

        allParams: bool
            If true, list all the parameters and not just the parameters that were different between the simulations.old.
        jsonFormat: bool
            If true, return JSON and not a normalized pandas.DataFrame.

        Returns
        -------
            pandas.DataFrame or dict
            A list of the simulations.old and their values.

        """
        simulationList = self.getWorkflowDocumentsInGroup(groupName=workflowGroup)
        ret = []
        for simdoc in simulationList:
            val = dict(workflowName=simdoc['desc']['workflowName'])

            if listNodes:
                val['nodes'] = simdoc['desc']['workflow']['workflow']['nodeList']

            if listParameters:
                val['parameters'] = simdoc['desc']['parameters']

            ret.append(val)

        return ret


    def listGroups(self, solver=None, workflowName=True):
        """
            Lists all the simulation groups of the current project.

        Parameters
        ----------

        solver : str
                    The name of the solver of this workflow.
                    If None, print all of them.

        workflowName : bool
                    if true, also lists all the simulations in that group.

        Returns
        -------

        """
        logger = get_classMethod_logger(self, "listGroups")
        qry = dict(type=self.DOCTYPE_WORKFLOW)
        logger.info("Getting the groups in the project.")
        if solver is not None:
            logger.info(f"....Using solver {solver}")
            qry['solver'] = solver

        docLists = self.getSimulationsDocuments(**qry)
        if len(docLists) == 0:
            logger.info(f"There are no workflow-groups in project {self.projectName}")
        else:
            data = pandas.DataFrame([dict(solver=doc['desc']['solver'], workflowName=doc['desc']['workflowName'],
                                          groupName=doc['desc']['groupName']) for doc in docLists])

            for (solverType, groupName), grpdata in data.groupby(["solver", "groupName"]):
                ttl = f"{solverType}"
                print(ttl)
                print("-" * (len(ttl)))
                print(f"\t* {groupName}")
                if workflowName:
                    for simName in grpdata.workflowName.unique():
                        print(f"\t\t + {simName}")


    def workflowTable(self, workflowGroup, longFormat=False, transpose=False):
        """
            Compares all the Workflow in the group name

            Each parameter that has different value across the workgroup is in the row, each simulation
            is in the column.

            Identical to the method compareWorkflowInGroup.

        Parameters
        ----------
        workflowGroup : str
            The group name.
        longFormat : bool
            If True, return the results in long format rather than in a wide table.
        transpose : bool
            If True return the simulation names as rows

        Returns
        -------
            Pandas with the difference in the parameter names.
        """

        return self.compareWorkflowInGroup(workflowGroup, longFormat, transpose)

__init__(projectName: str, filesDirectory: str = None, toolkitName: str = 'hermesWorkflowToolkit', connectionName=None)

Initializes the workflow toolkit.

Parameters:

Name Type Description Default
projectName str

The project that the workflow will be used i.

required
filesDirectory str

The directory to write all the Workflow and the outputs. default is current directory.

None
Source code in hera/simulations/hermesWorkflowToolkit.py
def __init__(self, projectName: str, filesDirectory: str = None, toolkitName: str = "hermesWorkflowToolkit", connectionName=None):
    """
        Initializes the workflow toolkit.

    Parameters
    ----------
    projectName: str
        The project that the workflow will be used i.

    filesDirectory : str
        The directory to write all the Workflow and the outputs. default is current directory.
    """
    super().__init__(projectName=projectName,
                     filesDirectory=filesDirectory,
                     toolkitName=toolkitName,
                     connectionName=connectionName)

listHermesSolverTemplates(solverName)

Returns a list of all the templates that were loaded for that specific solver.

Parameters:

Name Type Description Default
solverName str
required
Source code in hera/simulations/hermesWorkflowToolkit.py
def listHermesSolverTemplates(self, solverName):
    """
        Returns a list of all the templates that were loaded for that specific solver.

    Parameters
    ----------
    solverName : str


    Returns
    -------

    """
    retList = []
    for doc in self.getDataSourceDocumentsList(solver=solverName):
        data = dict(doc.desc)  # ['desc'])
        retList.append(data)

    if len(retList) > 0:
        return pandas.DataFrame(retList).set_index("name")
    else:
        return pandas.DataFrame()

getHermesFlowTemplate(hermesFlowName)

Get a hermes flow template

Parameters:

Name Type Description Default
solverName
required
Source code in hera/simulations/hermesWorkflowToolkit.py
def getHermesFlowTemplate(self, hermesFlowName):
    """
        Get a hermes flow template
    Parameters
    ----------
    solverName

    Returns
    -------

    """
    return self.getDataSourceData(hermesFlowName, desc__component="Flow")

listHermesNodesTemplates()

Returns a list of all the templates that were loaded for that specific solver.

Parameters:

Name Type Description Default
solverName str
required
Source code in hera/simulations/hermesWorkflowToolkit.py
def listHermesNodesTemplates(self):
    """
        Returns a list of all the templates that were loaded for that specific solver.

    Parameters
    ----------
    solverName : str


    Returns
    -------

    """
    retList = []
    for doc in self.getDataSourceDocumentsList(desc__component="Node"):
        data = dict(doc.desc['desc'])
        data['nodeName'] = doc.desc['datasourceName']
        retList.append(data)

    if len(retList) > 0:
        return pandas.DataFrame(retList).set_index("nodeName")
    else:
        return pandas.DataFrame()

getHermesNodeTemplate(hermesNodeName)

Get a hermes flow template

Parameters:

Name Type Description Default
solverName
required
Source code in hera/simulations/hermesWorkflowToolkit.py
def getHermesNodeTemplate(self, hermesNodeName):
    """
        Get a hermes flow template
    Parameters
    ----------
    solverName

    Returns
    -------

    """
    return self.getDataSourceData(hermesNodeName, desc__component="Node")

getHemresWorkflowFromDocument(documentList, returnFirst=True)

Return a hermes-workflow (or a list of hermes Workflow) to the user.

Parameters:

Name Type Description Default
documentList (list, document)

A hera.datalayer document or a list of documents.

required
returnFirst bool

If true, return obj of only the first iterm in the list (if it is a list).

True

Returns:

Type Description
hermes workflow object (or one of its derivatives).
Source code in hera/simulations/hermesWorkflowToolkit.py
def getHemresWorkflowFromDocument(self, documentList, returnFirst=True):
    """
        Return a hermes-workflow (or a list of hermes Workflow) to the user.

    Parameters
    ----------
    documentList : list, document
        A hera.datalayer document or a list of documents.

    returnFirst : bool
        If true, return obj of only the first iterm in the list (if it is a list).

    Returns
    -------
        hermes workflow object (or one of its derivatives).
    """
    logger = get_classMethod_logger(self, "getHemresWorkflowFromDocument")

    docList = documentList if isinstance(documentList, list) else [documentList]

    if returnFirst:
        if len(docList) == 0:
            logger.error("can't get first hermes workflow since documentList is empty")
        doc = docList[0]
        ret = self.getHermesWorkflowFromJSON(doc.desc['workflow'], name=doc.desc['workflowName'], resource=doc['resource'])
    else:
        ret = [self.getHermesWorkflowFromJSON(doc.desc['workflow'], name=doc.desc['workflowName'], resource=doc['resource']) for doc in
               docList]

    return ret

getHermesWorkflowFromJSON(workflow: Union[dict, str], name=None, resource=None)

Creates a hermes workflow object from the JSON that is supplied.

The JSON can be either file name, JSON string or a dictionary.

Parameters:

Name Type Description Default
workflow Union[dict, str]

The

required
simulationType str

The type of the workflow to create.

required

Returns:

Type Description
hermesWorkflow object.
Source code in hera/simulations/hermesWorkflowToolkit.py
def getHermesWorkflowFromJSON(self, workflow: Union[dict, str], name=None, resource=None):
    """
        Creates a hermes workflow object from the JSON that is supplied.

        The JSON can be either file name, JSON string or a dictionary.

    Parameters
    ----------
    workflow: dict,str
        The

    simulationType : str
        The type of the workflow to create.

    Returns
    -------
        hermesWorkflow object.
    """
    logger = get_classMethod_logger(self, "getHermesWorkflowFromJSON")
    # loadJSON handles multiple input types: file path, JSON string, or dict.
    workFlowJSON = loadJSON(workflow)

    # Dynamic class resolution: the 'solver' field determines which workflow
    # class to instantiate. If no solver is specified, use the generic
    # hermes.workflow base class. Otherwise, resolve a solver-specific
    # subclass (e.g. hera.simulations.openFoam.OFWorkflow.workflow_simpleFoam).
    # pydoc.locate dynamically imports and returns the class.
    ky = workFlowJSON['workflow'].get('solver', None)

    if ky is None:
        hermesWFObj = pydoc.locate("hermes.workflow")
    else:
        hermesWFObj = pydoc.locate(f"hera.simulations.openFoam.OFWorkflow.workflow_{ky}")

    if hermesWFObj is None:
        err = f"The workflow type {ky} not found"
        logger.error(err)
        raise ValueError(err)

    return hermesWFObj(workFlowJSON, name=name, Resource_path=resource)

updateDocumentWorkflow(document, workflow)

Update a simulation document's workflow and parameters in the database.

Parameters:

Name Type Description Default
document MetadataFrame

The simulation document to update.

required
workflow workflow

The workflow object whose JSON and parameters will be stored.

required
Source code in hera/simulations/hermesWorkflowToolkit.py
def updateDocumentWorkflow(self, document, workflow):
    """Update a simulation document's workflow and parameters in the database.

    Parameters
    ----------
    document : MetadataFrame
        The simulation document to update.
    workflow : hermes.workflow
        The workflow object whose JSON and parameters will be stored.
    """
    document.desc['workflow'] = workflow.json
    document.desc['parameters'] = workflow.parametersJSON
    document.save()

getHermesWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource: Union[dict, str, list, workflow], returnFirst=True, **query)

    Retrieve Workflow from the DB as hermes.workflow objects (or its derivatives).

    If the workflow is string, use it as a name. If the workflow is dict,
    use it as a filter on the paramters

    If returnFirst is False, return a list with all the results of the query.
    else, returns a single hermesworkflow.

Parameters:

Name Type Description Default
workflow

The filtering criteria. Either name, or the parameters of the flow.

required
returnFirst bool

If true, return only the first object (if found several results in the DB)

True
query

Additional query criteria.

{}

Returns:

Type Description
list (returnFirst is False)

hermes workflow.

Source code in hera/simulations/hermesWorkflowToolkit.py
def getHermesWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource: Union[dict, str, list, workflow],
                            returnFirst=True, **query):
    """
            Retrieve Workflow from the DB as hermes.workflow objects (or its derivatives).

            If the workflow is string, use it as a name. If the workflow is dict,
            use it as a filter on the paramters

            If returnFirst is False, return a list with all the results of the query.
            else, returns a single hermesworkflow.

    Parameters
    ----------
    workflow: str, dict
        The filtering criteria. Either name, or the parameters of the flow.

    returnFirst : bool
        If true, return only the first object (if found several results in the DB)

    query: arguments
        Additional query criteria.

    Returns
    -------
        list (returnFirst is False)
        hermes workflow.
    """
    logger = get_classMethod_logger(self, "getHermesWorkflowFromDB")
    docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource, **query)
    if len(docList) == 0:
        logger.error(f"... not found. ")
        ret = None
    else:
        ret = self.getHemresWorkflowFromDocument(documentList=docList, returnFirst=returnFirst)
    return ret

getWorkflowDocumentByName(name, doctype=None, dockind='Simulations', **query)

Retrieve the simulation using only the name.

For a more sophisticated retrieval (that tries to retrieve by group name and by the content of the workflow)  use getWorkflowDocumentFromDB

Parameters:

Name Type Description Default
name str

The name of the workflow.

required
doctype

document type.

None
dockind

Whether the document is cachaed or Simulation.

'Simulations'
query dict

Additional criteria.

{}
Source code in hera/simulations/hermesWorkflowToolkit.py
def getWorkflowDocumentByName(self,name,doctype=None, dockind="Simulations",**query):
    """
        Retrieve the simulation using only the name.

        For a more sophisticated retrieval (that tries to retrieve by group name and by the content of the workflow)  use getWorkflowDocumentFromDB
    Parameters
    ----------
    name : str
        The name of the workflow.

    doctype  : string
        document type.

    dockind  : string
        Whether the document is cachaed or Simulation.

    query : dict
        Additional criteria.

    Returns
    -------

    """
    logger = get_classMethod_logger(self, "getWorkflowDocumentFromDB")
    doctype = self.DOCTYPE_WORKFLOW if doctype is None else doctype
    # try to find it as a name
    mongo_crit = dictToMongoQuery(query)

    retrieve_func = getattr(self, f"get{dockind}Documents")

    logger.info(f"Searching for {name} as a name of kind {dockind}")
    docList = retrieve_func(workflowName=name, type=doctype, **mongo_crit)
    return None if len(docList) == 0 else docList[0]

getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource, doctype=None, dockind='Simulations', **query)

Tries to find item as name, workflow directory , groupname or through the resource.
Additional queries are also applicable.

Parameters:

Name Type Description Default
nameOrWorkflowFileOrJSONOrResource string or dict
The name/dict that defines the item
required
doctype

document type.

None
dockind

Whether the document is cachaed or Simulation.

'Simulations'
query dict

Additional criteria.

{}

Returns:

Type Description
doc or empty list if not found.
Source code in hera/simulations/hermesWorkflowToolkit.py
def getWorkflowDocumentFromDB(self, nameOrWorkflowFileOrJSONOrResource, doctype=None, dockind="Simulations",**query):
    """
        Tries to find item as name, workflow directory , groupname or through the resource.
        Additional queries are also applicable.

    Parameters
    ----------
    nameOrWorkflowFileOrJSONOrResource : string or dict
            The name/dict that defines the item
    doctype  : string
        document type.

    dockind  : string
        Whether the document is cachaed or Simulation.

    query : dict
        Additional criteria.
    Returns
    -------
        doc or empty list if not found.
    """
    logger = get_classMethod_logger(self, "getWorkflowDocumentFromDB")
    doctype = self.DOCTYPE_WORKFLOW if doctype is None else doctype
    mongo_crit = dictToMongoQuery(query)

    # Dynamic dispatch: retrieve from Simulations or Cache collection
    # based on dockind parameter ("Simulations" or "Cache").
    retrieve_func = getattr(self, f"get{dockind}Documents")

    if isinstance(nameOrWorkflowFileOrJSONOrResource, str):
        # Cascading search strategy for string inputs:
        # Try each identification method in order, stopping at first match.
        # 1) workflowName — exact name match (e.g. "flow_0001")
        logger.info(f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a name of kind {dockind}")
        docList = retrieve_func(workflowName=nameOrWorkflowFileOrJSONOrResource, type=doctype, **mongo_crit)
        if len(docList) == 0:
            # 2) resource — match by file/directory path
            logger.info(f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a resource of kind {dockind}.")
            docList = retrieve_func(resource=nameOrWorkflowFileOrJSONOrResource, type=doctype, **mongo_crit)
            if len(docList) == 0:
                # 3) groupName — match all workflows in a group (e.g. "flow")
                logger.info(
                    f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a workflow group of kind {dockind}.")
                docList = retrieve_func(groupName=nameOrWorkflowFileOrJSONOrResource, type=doctype, **mongo_crit)
                if len(docList) == 0:
                    # 4) JSON content — try to parse string as JSON file path
                    # or JSON string, extract parameters, and query by parameter values.
                    # This enables finding a workflow by its content rather than name.
                    logger.info(f"... not found. Try to query as a json. ")
                    try:
                        jsn = loadJSON(nameOrWorkflowFileOrJSONOrResource)
                        wf = self.getHermesWorkflowFromJSON(jsn, resource=nameOrWorkflowFileOrJSONOrResource)
                        # Flatten the workflow parameters to MongoDB query format
                        # using dictToMongoQuery with "parameters" prefix.
                        currentQuery = dictToMongoQuery(wf.parametersJSON, prefix="parameters")
                        currentQuery.update(mongo_crit)
                        docList = retrieve_func(type=self.DOCTYPE_WORKFLOW, **currentQuery)
                    except ValueError as e:

                        # logger.debug(f"Searching for {nameOrWorkflowFileOrJSONOrResource} as a file.")
                        # if os.path.isfile(nameOrWorkflowFileOrJSONOrResource):
                        #     from ..datalayer.document import nonDBMetadataFrame
                        #     workflowName = os.path.basename(nameOrWorkflowFileOrJSONOrResource).split(".")[0]
                        #     grpTuple = workflowName.split("_")
                        #     groupName = grpTuple[0]
                        #     groupID = grpTuple[1] if len(grpTuple) > 1 else 0
                        #     hermesWF = self.getHermesWorkflowFromJSON(nameOrWorkflowFileOrJSONOrResource)
                        #     res = nonDBMetadataFrame(data=None,
                        #                              projecName=self.projectName,
                        #                              resource=os.path.join(self.FilesDirectory,
                        #                                                    nameOrWorkflowFileOrJSONOrResource),
                        #                              dataFormat=datatypes.STRING,
                        #                              type=self.DOCTYPE_WORKFLOW,
                        #                              groupName=groupName,
                        #                              groupID=groupID,
                        #                              workflowName=workflowName,
                        #                              workflowType=hermesWF.workflowType,
                        #                              workflow=hermesWF.json,
                        #                              parameters=hermesWF.parametersJSON
                        #                              )
                        #     docList = [res]
                        # else:
                        err = f"Error {e} when trying to load as JSON."
                        logger.error(err)
                        docList = []
                    except IsADirectoryError:
                        logger.debug(f"not found")
                        docList = []
                else:
                    logger.info(f"... Found it as workflow group ")
            else:
                logger.info(f"... Found it as resource ")
        else:
            logger.info(f"... Found it as name")

    elif isinstance(nameOrWorkflowFileOrJSONOrResource, dict) or isinstance(nameOrWorkflowFileOrJSONOrResource,
                                                                            workflow):
        if isinstance(nameOrWorkflowFileOrJSONOrResource, workflow):
            qryDict = nameOrWorkflowFileOrJSONOrResource.parametersJSON
        else:
            wrkflw = self.getHermesWorkflowFromJSON(nameOrWorkflowFileOrJSONOrResource)
            qryDict = wrkflw.parametersJSON

        logger.debug(f"Searching for {qryDict} using parameters")
        currentQuery = dictToMongoQuery(qryDict, prefix="parameters")
        currentQuery.update(mongo_crit)
        docList = retrieve_func(**currentQuery, type=self.DOCTYPE_WORKFLOW)
    else:
        docList = []

    return docList

getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource: Union[dict, str, list, workflow], **query)

Returns the simulation document from the DB.
The nameOrWorkflowFileOrJSONOrResource can be either group name

Identify the simulation from :
 - Resource (thedirectory name)
 - Simulation name
 - Its workflow
 - workfolow dict.

Return the first item that was found.

Parameters:

Name Type Description Default
nameOrWorkflowFileOrJSONOrResource Union[dict, str, list, workflow]
required
Can
  • Resource (thedirectory name)
  • Simulation name
  • Its workflow
  • workfolow dict.
required
query dict

Additional query cireteria to the DB.

{}

Returns:

Type Description
A document, or None if not found. .
Source code in hera/simulations/hermesWorkflowToolkit.py
def getWorkflowListDocumentFromDB(self, nameOrWorkflowFileOrJSONOrResource: Union[dict, str, list, workflow],
                                  **query):
    """
        Returns the simulation document from the DB.
        The nameOrWorkflowFileOrJSONOrResource can be either group name

        Identify the simulation from :
         - Resource (thedirectory name)
         - Simulation name
         - Its workflow
         - workfolow dict.

        Return the first item that was found.

    Parameters
    ----------
    nameOrWorkflowFileOrJSONOrResource: str, dict

    Can be
         - Resource (thedirectory name)
         - Simulation name
         - Its workflow
         - workfolow dict.

    query : dict
        Additional query cireteria to the DB.

    Returns
    -------
        A document, or None if not found. .
    """

    if isinstance(nameOrWorkflowFileOrJSONOrResource, list):
        docList = []
        for simulationItem in nameOrWorkflowFileOrJSONOrResource:
            docList += self.getWorkflowDocumentFromDB(simulationItem)
    else:
        docList = self.getWorkflowDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)

    return docList

getWorkflowListOfSolvers(solverName: str, **query)

Returns all the documents of the requested solver.

Parameters:

Name Type Description Default
solverName str

The name of the solver

required
query param - list

Additional query

{}

Returns:

Type Description
list of documnts.
Source code in hera/simulations/hermesWorkflowToolkit.py
def getWorkflowListOfSolvers(self, solverName: str, **query):
    """
        Returns all the documents of the requested solver.

    Parameters
    ----------
    solverName : str
        The name of the solver
    query : param-list
        Additional query

    Returns
    -------
        list of documnts.
    """
    return self.getSimulationsDocuments(solver=solverName, type=self.DOCTYPE_WORKFLOW, **query)

getWorkflowDocumentsInGroup(groupName: str, **kwargs)

Return a list of all the simulations.old with the name as a prefic, and of the requested simuationType.
Returns the list of the documents.

If the simuationType is None use the default simuationType (WORKFLOW).

Parameters:

Name Type Description Default
groupName str

The prefix name of all the runs.

required
simulationType str[optional]

The type of the workflow. if None, return all.

required
kwargs
Use mongodb criteria.
{}

Returns:

Type Description
list of mongo documents.
Source code in hera/simulations/hermesWorkflowToolkit.py
def getWorkflowDocumentsInGroup(self, groupName: str, **kwargs):
    """
        Return a list of all the simulations.old with the name as a prefic, and of the requested simuationType.
        Returns the list of the documents.

        If the simuationType is None use the default simuationType (WORKFLOW).

    Parameters
    ----------

    groupName : str
        The prefix name of all the runs.

    simulationType : str [optional]
        The type of the workflow.
        if None, return all.

    kwargs: additional filtering criteria.
            Use mongodb criteria.

    Returns
    -------
        list of mongo documents.

    """
    return self.getSimulationsDocuments(groupName=groupName, type=self.DOCTYPE_WORKFLOW, **kwargs)

findAvailableName(simulationGroup: str, **kwargs)

Finds the next availabe name of that prefix. The available name is the maximal ID + 1.

we assume that the name of each run is:
    <prefix name>_<id>.

Parameters:

Name Type Description Default
simulationGroup str

The simulation group

required
simulationType str

The type of the workflow.

required
**kwargs dict
additional fileters.
Note that these should be the full path of the parameter in the JSON.
{}

Returns:

Type Description
int,str

The new ID, The name.

Source code in hera/simulations/hermesWorkflowToolkit.py
def findAvailableName(self, simulationGroup: str, **kwargs):
    """
        Finds the next availabe name of that prefix. The available name is the maximal ID + 1.

        we assume that the name of each run is:
            <prefix name>_<id>.


    Parameters
    ----------
    simulationGroup : str
        The simulation group

    simulationType : str
        The type of the workflow.

    **kwargs : dict
            additional fileters.
            Note that these should be the full path of the parameter in the JSON.

    Returns
    -------
        int,str
        The new ID,
        The name.
    """
    newID = self.getCounterAndAdd(f"simulations_{simulationGroup}")
    return newID, self.getworkFlowName(simulationGroup, newID)

getworkFlowName(baseName, flowID) staticmethod

Returns the name of the flow field from the base and the
flow id.

The name is <base>_<id>
where <id> is padded.

Parameters:

Name Type Description Default
baseName str
The base name
required
flowID
the id of the name.
required
Source code in hera/simulations/hermesWorkflowToolkit.py
@staticmethod
def getworkFlowName(baseName, flowID):
    """
        Returns the name of the flow field from the base and the
        flow id.

        The name is <base>_<id>
        where <id> is padded.

    Parameters
    ----------
    baseName : str
            The base name
    flowID: int
            the id of the name.

    Returns
    -------

    """
    formatted_number = "{0:04d}".format(flowID)
    return f"{baseName}_{formatted_number}"

splitWorkflowName(workflow_name: str) staticmethod

splits workflow name into base name and the flow id.

Returns:

Type Description
base name and the flow id, both as strings
Source code in hera/simulations/hermesWorkflowToolkit.py
@staticmethod
def splitWorkflowName(workflow_name:str):
    """splits workflow name into base name and the flow id.

    Returns
    -------
    base name and the flow id, both as strings

    """
    if "_" in workflow_name:
        split_name = workflow_name.split("_")
        return "".join(split_name[:-1]), split_name[-1]
    return workflow_name, None

addWorkflowFileInGroup(workflowFilePath, write_file=False)

adds the workflow to the database and assigning group based on the name

Parameters:

Name Type Description Default
workflowFilePath str
required
write_file bool
False
Source code in hera/simulations/hermesWorkflowToolkit.py
def addWorkflowFileInGroup(self,workflowFilePath, write_file=False):
    """adds the workflow to the database and assigning group based on the name

    Parameters
    ----------
    workflowFilePath : str
    write_file : bool

    Returns
    -------

    """
    get_classMethod_logger(self, "addWorkflowFileInGroup")
    if workflow is None:
        raise NotImplementedError("addWorkflowFileInGroup() requires the 'hermes' library, which is not installed")

    workflowFileName = os.path.basename(workflowFilePath)
    workflowName = os.path.splitext(workflowFileName)[0]
    if not os.path.abspath(workflowFilePath).startswith(self.FilesDirectory):
        raise ValueError(f"{os.path.abspath(workflowFilePath)} is not in {self.FilesDirectory}")
    doc = self.getWorkflowDocumentByName(workflowName)

    if doc is None:
        doc = self.addWorkflowToGroup(workflowFilePath, self.splitWorkflowName(workflowName)[0], writeWorkflowToFile=write_file, resource=workflowFilePath)
    return doc

addWorkflowToGroup(workflowJSON: str, groupName: str, writeWorkflowToFile: bool = False, resource=None)

Adds the workflow to the database, or updates an existing document.

If the fullName is true:
    The groupOrFullName is the name of the simulation.
    Hence, try to get the name from the DB. If the document exists update it with the
    new data if the overwrite is True.
    If the record is not in the DB, aadd it with that name.

If the fullName is False:
    Check if the workflow is in the DB. If it is not,
    generate a new name with the counter and add it.

Parameters:

Name Type Description Default
workflowJSON str

The file name that contains the workflow.

required
groupName str

The group to assign the workflow to.

required
fullName

`Treat the groupOrFullName as a full name if True.

required
overwrite bool

If true, then update the json workflow in the document if it exists.

required
writeWorkflowToFile bool

If true, then write the JSON file to the disk.

False

Returns:

Type Description
The document of the new workflow.
Source code in hera/simulations/hermesWorkflowToolkit.py
def addWorkflowToGroup(self, workflowJSON: str, groupName: str, writeWorkflowToFile:bool=False, resource=None):
    """
        Adds the workflow to the database, or updates an existing document.

        If the fullName is true:
            The groupOrFullName is the name of the simulation.
            Hence, try to get the name from the DB. If the document exists update it with the
            new data if the overwrite is True.
            If the record is not in the DB, aadd it with that name.

        If the fullName is False:
            Check if the workflow is in the DB. If it is not,
            generate a new name with the counter and add it.


    Parameters
    ----------
    workflowJSON : str
        The file name that contains the workflow.

    groupName : str
        The group to assign the workflow to.

    fullName: str
        `Treat the groupOrFullName as a full name if True.

    overwrite : bool
        If true, then update the json workflow in the document if it exists.

    writeWorkflowToFile : bool
        If true, then write the JSON file to the disk.

    Returns
    -------
        The document of the new workflow.
    """
    logger = get_classMethod_logger(self, "addWorkflowToGroup")
    if workflow is None:
        raise NotImplementedError("addWorkflowToGroup() requires the 'hermes' library, which is nor installed")

    # Idempotent add: first check if this exact workflow already exists
    # in the DB (matched by its parameter values). If found, return the
    # existing document without creating a duplicate.
    workflowData = loadJSON(workflowJSON)
    docList = self.getWorkflowDocumentFromDB(workflowData)
    if len(docList) > 0:
        logger.info(f"...Found. Returning the document.")
        doc = docList[0]
    else:
        # New workflow: generate a unique name using the group counter.
        # Naming convention: <groupName>_<padded_id> (e.g. "flow_0001").
        # The counter is per-group, stored in the project config.
        logger.info("...Not Found, adding the input to the DB")
        groupID = self.getCounterAndAdd(groupName)
        workflowName = self.getworkFlowName(groupName, groupID)
        resource = os.path.abspath(resource) if resource else (os.path.join(self.FilesDirectory, workflowName) + ".json")
        hermesWF = workflow(workflowData, Resource_path=resource)
        # Store the full workflow JSON + extracted parameters in the document.
        # The parameters are stored separately to enable efficient querying
        # via dictToMongoQuery without parsing the full workflow tree.
        doc = self.addSimulationsDocument(resource=resource,
                                          dataFormat=datatypes.STRING,
                                          type=self.DOCTYPE_WORKFLOW,
                                          desc=dict(
                                              groupName=groupName,
                                              groupID=groupID,
                                              workflowName=workflowName,
                                              solver=hermesWF.solver,
                                              workflow=hermesWF.json,
                                              parameters=hermesWF.parametersJSON)
                                          )

    if writeWorkflowToFile:
        workflowName = self.getworkFlowName(doc.desc['groupName'], doc.desc['groupID'])
        resource = os.path.abspath(resource) if resource else (os.path.join(self.FilesDirectory, workflowName) + ".json")
        hermesWF = workflow(doc.desc['workflow'], WD_path=self.FilesDirectory, Resource_path=resource)
        with open(os.path.join(resource), "w") as outFile:
            json.dump(hermesWF.json, outFile, indent=4)


    return doc

executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource)

Building and Executing the workflow.

Note that it is only executing the workflow.
For OpenFOAM simulations, use the runOFSimulation method that will build the case
and then run it.

Note the procedure removes the [name]_targetFiles

Parameters:

Name Type Description Default
Can
  • Resource (thedirectory name)
  • Simulation name
  • Its workflow
  • workfolow dict.
required
build bool [default = True]

If true, also builds the workflow.

required

Returns:

Type Description
None
Source code in hera/simulations/hermesWorkflowToolkit.py
def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource):
    """
        Building and Executing the workflow.

        Note that it is only executing the workflow.
        For OpenFOAM simulations, use the runOFSimulation method that will build the case
        and then run it.

        Note the procedure removes the [name]_targetFiles

    Parameters
    ----------
            nameOrWorkflowFileOrJSONOrResource: str, dict

    Can be
         - Resource (thedirectory name)
         - Simulation name
         - Its workflow
         - workfolow dict.

    build : bool [default = True]
        If true, also builds the workflow.

    Returns
    -------
        None
    """
    logger = get_classMethod_logger(self, "executeWorkflow")
    docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)

    for doc in docList:
        workflowJSON = doc.desc['workflow']
        workflowName = doc.desc['workflowName']
        logger.info(f"Processing {workflowName}")

        # Step 1: Reconstruct the hermes workflow object from the stored JSON.
        # The workflow class is resolved dynamically via pydoc.locate based on
        # the 'solver' field (generic hermes.workflow or solver-specific subclass).
        hermesWF = self.getHermesWorkflowFromJSON(workflowJSON, name=workflowName, resource=doc['resource'])

        # Step 2: Build the workflow into a Luigi task DAG.
        # hermes.build() traverses the workflow node tree, wraps each node in a
        # Luigi task, and returns the Python source code for the task module.
        logger.info(f"Building and executing the workflow {workflowName}")
        build = hermesWF.build(buildername=workflow.BUILDER_LUIGI)

        # Step 3: Write the workflow JSON and generated Python module to disk.
        # The JSON is written to the resource path; the Python module contains
        # the Luigi task definitions that will be executed.
        logger.info(f"Writing the workflow and the executer python {workflowName}")
        wfFileName = hermesWF.Resource_path
        hermesWF.write(wfFileName)

        pythonFileName = os.path.join(self.FilesDirectory, f"{workflowName}.py")
        with open(pythonFileName, "w") as outFile:
            outFile.write(build)

        # Step 4: Clean previous execution artifacts (Luigi target files).
        # Luigi uses target files to track task completion. Removing them
        # forces all tasks to re-execute from scratch.
        logger.debug(f"Removing the targetfiles and execute")
        executionfileDir = os.path.join(self.FilesDirectory, f"{workflowName}_targetFiles")
        shutil.rmtree(executionfileDir, ignore_errors=True)

        # Step 5: Execute the Luigi pipeline via command line.
        # 'finalnode_xx_0' is the terminal task that triggers the full DAG.
        # --local-scheduler avoids requiring a separate Luigi scheduler process.
        pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}")
        executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler"
        logger.debug(executionStr)
        os.system(executionStr)

        # Step 6: Clean up the generated Python module (the workflow JSON stays).
        logger.info(f"Cleaning the executer python for {workflowName}")
        os.remove(pythonFileName)

compareWorkflowObj(workflowList, longFormat: bool = False)

Compares the parameters of the Workflow to each other.

Parameters:

Name Type Description Default
workflowList list of hermes workflow objects
The list of Workflow to compare.
required
Source code in hera/simulations/hermesWorkflowToolkit.py
def compareWorkflowObj(self,
                       workflowList,
                       longFormat: bool = False):
    """
        Compares the parameters of the Workflow to each other.


    Parameters
    ----------
    workflowList : list of hermes workflow objects
            The list of Workflow to compare.
    Returns
    -------

    """
    return compareJSONS(**dict([(wf.name, wf.parametersJSON) for wf in workflowList]),
                        longFormat=longFormat,changeDotToUnderscore=True)

compareWorkflows(Workflow: Union[list, str], longFormat: bool = False, transpose: bool = False) -> Union[dict, pandas.DataFrame]

Compares two or more hermes Workflow.

Parameters:

Name Type Description Default
Workflow (str, list)
A single input uses it as a group name,
a list is the list of Workflow names to compare.
required
diffParams
If true display only differences.
required
JSON
If true, return the results as a JSON and not pandas.DataFrame.
required

Returns:

Type Description
pandas.DataFrame, json (depends on the input flags).

Return the differences between the parametrs of the requested Workflow.

Source code in hera/simulations/hermesWorkflowToolkit.py
def compareWorkflows(self,
                     Workflow: Union[list, str],
                     longFormat: bool = False,
                     transpose: bool = False) -> Union[dict, pandas.DataFrame]:
    """
        Compares two or more hermes Workflow.

    Parameters
    ----------
    Workflow : str,list
            A single input uses it as a group name,
            a list is the list of Workflow names to compare.

    diffParams: bool
            If true display only differences.

    JSON: bool
            If true, return the results as a JSON and not pandas.DataFrame.

    Returns
    -------
        pandas.DataFrame, json (depends on the input flags).
        Return the differences between the parametrs of the requested Workflow.
    """
    logger = get_classMethod_logger(self, "compareWorkflow")
    if Workflow is None:
        raise NotImplementedError("compare() requires the 'hermes' library, which is nor installed")
    logger.info("--- Start ---")

    workflowList = []
    for workflowName in numpy.atleast_1d(Workflow):
        simulationList = self.getWorkflowListDocumentFromDB(workflowName)
        if len(simulationList) == 0:
            if os.path.isfile(workflowName):
                workflowList.append(self.getHermesWorkflowFromJSON(workflowName, name=workflowName))
            else:
                err = f"Cannog find simulations for {workflowName}"
                logger.error(err)
        else:
            groupworkflowList = [workflow(simulationDoc['desc']['workflow'], WD_path=self.FilesDirectory,
                                          name=simulationDoc.desc[self.DESC_WORKFLOWNAME], resource=simulationDoc['resource']) for simulationDoc in
                                 simulationList]
            workflowList += groupworkflowList

    res = self.compareWorkflowObj(workflowList, longFormat=longFormat)

    return res.T if transpose else res

compareWorkflowInGroup(workflowGroup, longFormat=False, transpose=False)

Compares all the Workflow in the group name

Each parameter that has different value across the workgroup is in the row, each simulation
is in the column.

Parameters:

Name Type Description Default
workflowGroup str

The group name.

required
longFormat bool

If True, return the results in long format rather than in a wide table.

False
transpose bool

If True return the simulation names as rows

False

Returns:

Type Description
Pandas with the difference in the parameter names.
Source code in hera/simulations/hermesWorkflowToolkit.py
def compareWorkflowInGroup(self, workflowGroup, longFormat=False, transpose=False):
    """
        Compares all the Workflow in the group name

        Each parameter that has different value across the workgroup is in the row, each simulation
        is in the column.

    Parameters
    ----------
    workflowGroup : str
        The group name.
    longFormat : bool
        If True, return the results in long format rather than in a wide table.
    transpose : bool
        If True return the simulation names as rows

    Returns
    -------
        Pandas with the difference in the parameter names.
    """
    simulationList = self.getWorkflowDocumentsInGroup(groupName=workflowGroup)
    workflowList = [workflow(simulationDoc['desc']['workflow'], WD_path=self.FilesDirectory,
                             name=simulationDoc.desc[self.DESC_WORKFLOWNAME], resource=simulationDoc['resource']) for simulationDoc in simulationList]
    if len(workflowList) == 0:
        ret = None
    else:
        res = self.compareWorkflowObj(workflowList, longFormat=longFormat).T
        ret =  res.T if transpose else res

    return ret

deleteWorkflowInGroup(workflowGroup, deepDelete=False, resetCounter=True, exclude=[])

Deletes all the workflows in the group.

Parameters:

Name Type Description Default
workflowGroup str

The name of the workgroup.

required
deepDelete

If true, delete the resources.

False
resetCounter bool [default = True]

Reset the counter of the group to 1.

True
Source code in hera/simulations/hermesWorkflowToolkit.py
def deleteWorkflowInGroup(self,workflowGroup,deepDelete=False,resetCounter=True, exclude=[]):
    """
        Deletes all the workflows in the group.
    Parameters
    ----------
    workflowGroup : str
        The name of the workgroup.

    deepDelete: bool [default = False]
        If true, delete the resources.

    resetCounter : bool [default = True]
        Reset the counter of the group to 1.

    Returns
    -------

    """
    simulationList = self.getWorkflowDocumentsInGroup(groupName=workflowGroup)
    for doc in simulationList:
        if doc['desc']['workflowName'] in exclude:
            continue
        if os.path.exists(doc.resource) and deepDelete:
            shutil.rmtree(doc.resource)

        doc.delete()

    if resetCounter:
        self.setCounter(counterName=workflowGroup)

listWorkflows(workflowGroup: str, listNodes: bool = False, listParameters: bool = False) -> Union[pandas.DataFrame, dict]

Lists all the simulations in the simulation group (of this project).

Allows additional filters using the simulationType.

If parameters is not None, return the list of parameters.
return the parameters of all the nodes if the paraleters is an empty List, or the requested parameters.
The default behaviour is to return only the parameters that are different from each other, unless allParams
is True.

The output is either pandas.DataFrame (if jsonFormat is False) or a JSON (if JSON is True).

Parameters:

Name Type Description Default
workflowGroup str

The name of the group

required
parametersOfNodes

If None, just return the names of the simulations.old. Otherwise add the parameters from the requested nodes.

required
allParams

If true, list all the parameters and not just the parameters that were different between the simulations.old.

required
jsonFormat

If true, return JSON and not a normalized pandas.DataFrame.

required

Returns:

Type Description
pandas.DataFrame or dict

A list of the simulations.old and their values.

Source code in hera/simulations/hermesWorkflowToolkit.py
def listWorkflows(self,
                  workflowGroup: str,
                  listNodes: bool = False,
                  listParameters: bool = False) -> Union[pandas.DataFrame, dict]:
    """
        Lists all the simulations in the simulation group (of this project).

        Allows additional filters using the simulationType.

        If parameters is not None, return the list of parameters.
        return the parameters of all the nodes if the paraleters is an empty List, or the requested parameters.
        The default behaviour is to return only the parameters that are different from each other, unless allParams
        is True.

        The output is either pandas.DataFrame (if jsonFormat is False) or a JSON (if JSON is True).

    Parameters
    ----------
    workflowGroup : str
        The name of the group

    parametersOfNodes  : list[str]
        If None, just return the names of the simulations.old. Otherwise add the parameters from the requested nodes.

    allParams: bool
        If true, list all the parameters and not just the parameters that were different between the simulations.old.
    jsonFormat: bool
        If true, return JSON and not a normalized pandas.DataFrame.

    Returns
    -------
        pandas.DataFrame or dict
        A list of the simulations.old and their values.

    """
    simulationList = self.getWorkflowDocumentsInGroup(groupName=workflowGroup)
    ret = []
    for simdoc in simulationList:
        val = dict(workflowName=simdoc['desc']['workflowName'])

        if listNodes:
            val['nodes'] = simdoc['desc']['workflow']['workflow']['nodeList']

        if listParameters:
            val['parameters'] = simdoc['desc']['parameters']

        ret.append(val)

    return ret

listGroups(solver=None, workflowName=True)

Lists all the simulation groups of the current project.

Parameters:

Name Type Description Default
solver str
    The name of the solver of this workflow.
    If None, print all of them.
None
workflowName bool
    if true, also lists all the simulations in that group.
True
Source code in hera/simulations/hermesWorkflowToolkit.py
def listGroups(self, solver=None, workflowName=True):
    """
        Lists all the simulation groups of the current project.

    Parameters
    ----------

    solver : str
                The name of the solver of this workflow.
                If None, print all of them.

    workflowName : bool
                if true, also lists all the simulations in that group.

    Returns
    -------

    """
    logger = get_classMethod_logger(self, "listGroups")
    qry = dict(type=self.DOCTYPE_WORKFLOW)
    logger.info("Getting the groups in the project.")
    if solver is not None:
        logger.info(f"....Using solver {solver}")
        qry['solver'] = solver

    docLists = self.getSimulationsDocuments(**qry)
    if len(docLists) == 0:
        logger.info(f"There are no workflow-groups in project {self.projectName}")
    else:
        data = pandas.DataFrame([dict(solver=doc['desc']['solver'], workflowName=doc['desc']['workflowName'],
                                      groupName=doc['desc']['groupName']) for doc in docLists])

        for (solverType, groupName), grpdata in data.groupby(["solver", "groupName"]):
            ttl = f"{solverType}"
            print(ttl)
            print("-" * (len(ttl)))
            print(f"\t* {groupName}")
            if workflowName:
                for simName in grpdata.workflowName.unique():
                    print(f"\t\t + {simName}")

workflowTable(workflowGroup, longFormat=False, transpose=False)

Compares all the Workflow in the group name

Each parameter that has different value across the workgroup is in the row, each simulation
is in the column.

Identical to the method compareWorkflowInGroup.

Parameters:

Name Type Description Default
workflowGroup str

The group name.

required
longFormat bool

If True, return the results in long format rather than in a wide table.

False
transpose bool

If True return the simulation names as rows

False

Returns:

Type Description
Pandas with the difference in the parameter names.
Source code in hera/simulations/hermesWorkflowToolkit.py
def workflowTable(self, workflowGroup, longFormat=False, transpose=False):
    """
        Compares all the Workflow in the group name

        Each parameter that has different value across the workgroup is in the row, each simulation
        is in the column.

        Identical to the method compareWorkflowInGroup.

    Parameters
    ----------
    workflowGroup : str
        The group name.
    longFormat : bool
        If True, return the results in long format rather than in a wide table.
    transpose : bool
        If True return the simulation names as rows

    Returns
    -------
        Pandas with the difference in the parameter names.
    """

    return self.compareWorkflowInGroup(workflowGroup, longFormat, transpose)

Machine Learning

machineLearningDeepLearningToolkit

hera.simulations.machineLearningDeepLearning.toolkit.machineLearningDeepLearningToolkit

Bases: abstractToolkit

The class handles machine/deep learning models.

It helps saving hyper parameters and it provide simple tools (like batch/train splitting).

Notes: * Torch models it requires pytorch installed. * SkiLearn requires scikitlearn installed.

Source code in hera/simulations/machineLearningDeepLearning/toolkit.py
class machineLearningDeepLearningToolkit(abstractToolkit):
    """
        The class handles machine/deep learning models.

        It helps saving hyper parameters and it provide simple
        tools (like batch/train splitting).

        Notes:
            * Torch models it requires pytorch installed.
            * SkiLearn requires scikitlearn installed.

    """

    def __init__(self, projectName: str, filesDirectory: str = None, connectionName=None):
        """
            Initializes the machineLearning/deepLearning toolkit.

        Parameters
        ----------
        projectName: str
            The project where the models are stored.

        filesDirectory : str
            The directory to write all the Workflow and the outputs. default is current directory.
        """
        super().__init__(projectName=projectName,
                         filesDirectory=filesDirectory,
                         toolkitName= "machineLearningDeepLearningToolkit",
                         connectionName=connectionName)

    def getEmptyTorchModelContainer(self):
        return torchLightingModelContainer(self)


    def getTorchModelFromJSON(self,modelJSON):
        newModel = torchLightingModelContainer(self)
        newModel.modelJSON = modelJSON
        return newModel


    def listTorchModels(self, modelObjectOrName=None, longFormat=True, **qry):
        qryMongo = dictToMongoQuery(qry)
        if modelObjectOrName is not None:
            if isinstance(modelObjectOrName, str):
                qryMongo["model__model__classpath"] = modelObjectOrName
            else:
                qryMongo["model__model__classpath"] = self.get_model_fullname(modelObjectOrName)
        docList = self.getSimulationsDocuments(type=torchLightingModelContainer.MODEL, **qryMongo)
        return compareJSONS(**dict([(f"{mdl.desc['modelID']}", mdl.desc['model']) for mdl in docList]),
                            longFormat=longFormat,changeDotToUnderscore=True).rename(columns=dict(datasetName="modelID"))

    def getTorchModelContainerByID(self, modelID, **qry):
        qryModngo = dictToMongoQuery(qry,prefix="model")
        docList = self.getSimulationsDocuments(type=torchLightingModelContainer.MODEL, modelID=modelID, **qryModngo)
        if len(docList)>0:
            mdlDesc = self.getEmptyTorchModelContainer()
            mdlDesc.modelJSON = docList[0].desc['model']
            mdlDesc.load()
        else:
            mdlDesc= None
        return mdlDesc

    def packTorchModelByID(self,modelIDorListID,packFileName):
        """
            Pack one or more models, checkpoint and
        Parameters
        ----------
        modelIDorListID : int/list of it
                ID or list of model id
        packFileName: string
                The name of the output file.

        Returns
        -------

        """
        if isinstance(modelIDorListID,Iterable):
            modelContainer = [self.getTorchModelContainerByID(x) for x in modelIDorListID]
            if len(modelContainer):
                raise ValueError(f"Non of the Models ID  {modelIDorListID} not found project {self.projectName}. If this is not the project you ment, make sure caseConfiguration.json exists or that you initialized the toolkit with the desired project name")
        else:
            modelContainer = self.getTorchModelContainerByID(modelIDorListID)

            if modelContainer is None:
                raise ValueError(f"Model {modelIDorListID} not found in project {self.projectName}. If this is not the project you ment, make sure caseConfiguration.json exists or that you initialized the toolkit with the desired project name")

        self.packTorchModel(modelContainer,packFileName)

    def packTorchModel(self,modelContainerOrListContainers,packFileName):
        """

        Parameters
        ----------
        modelContainer

        Returns
        -------

        """
        logger = get_classMethod_logger(self,"packModel")
        logger.info("Packing models")
        if not isinstance(modelContainerOrListContainers,Iterable):
            modelContainerOrListContainers = [modelContainerOrListContainers] # make it iterable.

        itemsToZip = []
        modelsDict = dict()
        for modelContainer in modelContainerOrListContainers:
            logger.info(f"Pack model {modelContainer.modelID}")
            modelDoc = modelContainer.getModelDocument()
            logger.debug("Remove all the locations of the classes. They will be found again when loading")
            descWithoutFilePaths = remove_key_recursively_new(modelDoc.desc,"filepath")
            logger.debug(f"Remove all the prefix of the dataset. We assume that it is relative to the files diction of the project (={self.filesDirectory}")
            descWithoutFilePaths = remove_prefix_from_values(descWithoutFilePaths, "pathToData", self.filesDirectory)
            modelDescStr = json.dumps(descWithoutFilePaths,indent=4)
            modelName    = f"{modelContainer.modelName}_{modelContainer.modelID}.json"
            modelsDict[modelName] = modelDescStr
            logger.debug("Pack items")
            itemsToZip.append(modelDoc.getData())

        itemsToZip.append(modelsDict)
        zip_items(packFileName,itemsToZip)

    def loadPackedModel(self,archiveFile,overwrite=False):
        """
            Loads the model to the database, and extracts the runtime data to the directory.

            If the model is in the database, we will overwrite the data if the overwrite is True.
            Otherwise, it will skip.

        Parameters
        ----------
        archiveFile
        overwrite

        Returns
        -------

        """
        logger = get_classMethod_logger(self,"packModel")
        logger.info("Unpacking models")

        models  = list_json_files_in_zip(archiveFile)
        for model in models:
            modelName = model['name']
            logger.info(f"Loading the model {modelName}")
            modelJSON = model['content']

            modelJSON = self.update_classes_filepath(modelJSON)
            modelJSON = self.append_filesDirectory_to_pathToData(modelJSON)

            modelContainer = self.getTorchModelFromJSON(modelJSON['model'])
            modelContainerDoc = modelContainer.getModelDocument()


            # Check if the directory exists.
            targetDataPath = modelContainerDoc.getData()
            if os.path.exists(targetDataPath) and not overwrite:
                err = f"Model {modelContainer.modelName} with the requested parameters already exists as model {modelContainer.modelID}).  Skipping unpacking since  overwrite flag is flase. If you want to overwrite call with overwrite=True"
                logger.error(err)
                #raise ValueError(err)
            else:
                os.makedirs(targetDataPath, exist_ok=True)
                origmodelName = modelName.split(".")[0]

                with zipfile.ZipFile(archiveFile, 'r') as zip_ref:
                    for member in zip_ref.infolist():
                        memberName = member.filename
                        logger.debug(f"Checking item {memberName} in the archive")


                        if memberName.startswith(origmodelName) and ".json" not in memberName:
                            logger.debug(f"Changing path {origmodelName}->{targetDataPath} and write the file there")

                            # creating the
                            newNameList = [targetDataPath]+memberName.split(os.path.sep)[1:]
                            newFileName = os.path.join(*newNameList)
                            print(newFileName)
                            os.makedirs(os.path.dirname(newFileName), exist_ok=True)

                            with zip_ref.open(memberName) as source, open(newFileName, 'wb') as target:
                                target.write(source.read())





    def append_filesDirectory_to_pathToData(self, modelJSON):
        """

        Parameters
        ----------
        modelJSON

        Returns
        -------

        """
        if isinstance(modelJSON, dict):
            new_dict = {}
            for k, v in modelJSON.items():
                if k == "pathToData" and isinstance(v, str):
                    new_dict[k] = os.path.join(self.filesDirectory,v)
                elif isinstance(v, dict):
                    new_dict[k] = self.append_filesDirectory_to_pathToData(v)
                elif isinstance(v, list):
                    new_dict[k] = [self.append_filesDirectory_to_pathToData(v) if isinstance(item, dict) else item for item in v]
                else:
                    new_dict[k] = v
            return new_dict
        else:
            return modelJSON



    ## ====================================================================================================
    ## ====================================================================================================
    ## ===================================== CLASS METHODS ================================================
    ## ====================================================================================================
    ## ====================================================================================================

    @classmethod
    def get_model_fullname(cls,modelCls):
        name,data = cls.get_class_info(modelCls)
        return data['classpath']

    @classmethod
    def get_class_info(cls, modelClsOrName):
        if isinstance(modelClsOrName, str):
            modelCls = pydoc.locate(modelClsOrName)
            if modelCls is None:
                raise ValueError(f"class {modelClsOrName} not found. Is this pacakge in the classpath. Make sure the directory that contain the code to this class is in the environmental parameter PYTHONPATH. ")
        else:
            modelCls = modelClsOrName
        module = modelCls.__module__
        name = modelCls.__name__
        file_path = inspect.getfile(modelCls)
        file_path = os.path.dirname(os.path.abspath(file_path))

        full_path = f"{module}.{name}"
        patList = file_path.split(os.path.sep)
        moduleNameIndex = patList.index(full_path.split(".")[0])
        patList[0] = '/'
        module_file_path = os.path.join(*patList[:moduleNameIndex])

        return name, dict(classpath=full_path, filepath=module_file_path)

    @classmethod
    def update_classes_filepath(cls,modelJSON):
        """
        Recursively adds the correct files path to all the classpath of the machine.

        Parameters
        ----------
        modelJSON : dictionary to process

        Returns
        -------

        """

        if isinstance(modelJSON, dict):
            new_dict = {}
            for k, v in modelJSON.items():
                if k == "classpath" and isinstance(v, str):
                    _, fileData = cls.get_class_info(v)
                    new_dict["classpath"] = v
                    new_dict['filepath'] = fileData['filepath']
                elif isinstance(v, dict):
                    new_dict[k] = cls.update_classes_filepath(v)
                elif isinstance(v, list):
                    new_dict[k] = [cls.update_classes_filepath(v) if isinstance(item, dict) else item for item in v]
                else:
                    new_dict[k] = v
            return new_dict
        else:
            return modelJSON

__init__(projectName: str, filesDirectory: str = None, connectionName=None)

Initializes the machineLearning/deepLearning toolkit.

Parameters:

Name Type Description Default
projectName str

The project where the models are stored.

required
filesDirectory str

The directory to write all the Workflow and the outputs. default is current directory.

None
Source code in hera/simulations/machineLearningDeepLearning/toolkit.py
def __init__(self, projectName: str, filesDirectory: str = None, connectionName=None):
    """
        Initializes the machineLearning/deepLearning toolkit.

    Parameters
    ----------
    projectName: str
        The project where the models are stored.

    filesDirectory : str
        The directory to write all the Workflow and the outputs. default is current directory.
    """
    super().__init__(projectName=projectName,
                     filesDirectory=filesDirectory,
                     toolkitName= "machineLearningDeepLearningToolkit",
                     connectionName=connectionName)

packTorchModelByID(modelIDorListID, packFileName)

Pack one or more models, checkpoint and

Parameters:

Name Type Description Default
modelIDorListID int/list of it
ID or list of model id
required
packFileName
The name of the output file.
required
Source code in hera/simulations/machineLearningDeepLearning/toolkit.py
def packTorchModelByID(self,modelIDorListID,packFileName):
    """
        Pack one or more models, checkpoint and
    Parameters
    ----------
    modelIDorListID : int/list of it
            ID or list of model id
    packFileName: string
            The name of the output file.

    Returns
    -------

    """
    if isinstance(modelIDorListID,Iterable):
        modelContainer = [self.getTorchModelContainerByID(x) for x in modelIDorListID]
        if len(modelContainer):
            raise ValueError(f"Non of the Models ID  {modelIDorListID} not found project {self.projectName}. If this is not the project you ment, make sure caseConfiguration.json exists or that you initialized the toolkit with the desired project name")
    else:
        modelContainer = self.getTorchModelContainerByID(modelIDorListID)

        if modelContainer is None:
            raise ValueError(f"Model {modelIDorListID} not found in project {self.projectName}. If this is not the project you ment, make sure caseConfiguration.json exists or that you initialized the toolkit with the desired project name")

    self.packTorchModel(modelContainer,packFileName)

packTorchModel(modelContainerOrListContainers, packFileName)

Parameters:

Name Type Description Default
modelContainer
required
Source code in hera/simulations/machineLearningDeepLearning/toolkit.py
def packTorchModel(self,modelContainerOrListContainers,packFileName):
    """

    Parameters
    ----------
    modelContainer

    Returns
    -------

    """
    logger = get_classMethod_logger(self,"packModel")
    logger.info("Packing models")
    if not isinstance(modelContainerOrListContainers,Iterable):
        modelContainerOrListContainers = [modelContainerOrListContainers] # make it iterable.

    itemsToZip = []
    modelsDict = dict()
    for modelContainer in modelContainerOrListContainers:
        logger.info(f"Pack model {modelContainer.modelID}")
        modelDoc = modelContainer.getModelDocument()
        logger.debug("Remove all the locations of the classes. They will be found again when loading")
        descWithoutFilePaths = remove_key_recursively_new(modelDoc.desc,"filepath")
        logger.debug(f"Remove all the prefix of the dataset. We assume that it is relative to the files diction of the project (={self.filesDirectory}")
        descWithoutFilePaths = remove_prefix_from_values(descWithoutFilePaths, "pathToData", self.filesDirectory)
        modelDescStr = json.dumps(descWithoutFilePaths,indent=4)
        modelName    = f"{modelContainer.modelName}_{modelContainer.modelID}.json"
        modelsDict[modelName] = modelDescStr
        logger.debug("Pack items")
        itemsToZip.append(modelDoc.getData())

    itemsToZip.append(modelsDict)
    zip_items(packFileName,itemsToZip)

loadPackedModel(archiveFile, overwrite=False)

Loads the model to the database, and extracts the runtime data to the directory.

If the model is in the database, we will overwrite the data if the overwrite is True.
Otherwise, it will skip.

Parameters:

Name Type Description Default
archiveFile
required
overwrite
False
Source code in hera/simulations/machineLearningDeepLearning/toolkit.py
def loadPackedModel(self,archiveFile,overwrite=False):
    """
        Loads the model to the database, and extracts the runtime data to the directory.

        If the model is in the database, we will overwrite the data if the overwrite is True.
        Otherwise, it will skip.

    Parameters
    ----------
    archiveFile
    overwrite

    Returns
    -------

    """
    logger = get_classMethod_logger(self,"packModel")
    logger.info("Unpacking models")

    models  = list_json_files_in_zip(archiveFile)
    for model in models:
        modelName = model['name']
        logger.info(f"Loading the model {modelName}")
        modelJSON = model['content']

        modelJSON = self.update_classes_filepath(modelJSON)
        modelJSON = self.append_filesDirectory_to_pathToData(modelJSON)

        modelContainer = self.getTorchModelFromJSON(modelJSON['model'])
        modelContainerDoc = modelContainer.getModelDocument()


        # Check if the directory exists.
        targetDataPath = modelContainerDoc.getData()
        if os.path.exists(targetDataPath) and not overwrite:
            err = f"Model {modelContainer.modelName} with the requested parameters already exists as model {modelContainer.modelID}).  Skipping unpacking since  overwrite flag is flase. If you want to overwrite call with overwrite=True"
            logger.error(err)
            #raise ValueError(err)
        else:
            os.makedirs(targetDataPath, exist_ok=True)
            origmodelName = modelName.split(".")[0]

            with zipfile.ZipFile(archiveFile, 'r') as zip_ref:
                for member in zip_ref.infolist():
                    memberName = member.filename
                    logger.debug(f"Checking item {memberName} in the archive")


                    if memberName.startswith(origmodelName) and ".json" not in memberName:
                        logger.debug(f"Changing path {origmodelName}->{targetDataPath} and write the file there")

                        # creating the
                        newNameList = [targetDataPath]+memberName.split(os.path.sep)[1:]
                        newFileName = os.path.join(*newNameList)
                        print(newFileName)
                        os.makedirs(os.path.dirname(newFileName), exist_ok=True)

                        with zip_ref.open(memberName) as source, open(newFileName, 'wb') as target:
                            target.write(source.read())

append_filesDirectory_to_pathToData(modelJSON)

Parameters:

Name Type Description Default
modelJSON
required
Source code in hera/simulations/machineLearningDeepLearning/toolkit.py
def append_filesDirectory_to_pathToData(self, modelJSON):
    """

    Parameters
    ----------
    modelJSON

    Returns
    -------

    """
    if isinstance(modelJSON, dict):
        new_dict = {}
        for k, v in modelJSON.items():
            if k == "pathToData" and isinstance(v, str):
                new_dict[k] = os.path.join(self.filesDirectory,v)
            elif isinstance(v, dict):
                new_dict[k] = self.append_filesDirectory_to_pathToData(v)
            elif isinstance(v, list):
                new_dict[k] = [self.append_filesDirectory_to_pathToData(v) if isinstance(item, dict) else item for item in v]
            else:
                new_dict[k] = v
        return new_dict
    else:
        return modelJSON

update_classes_filepath(modelJSON) classmethod

Recursively adds the correct files path to all the classpath of the machine.

Parameters:

Name Type Description Default
modelJSON dictionary to process
required
Source code in hera/simulations/machineLearningDeepLearning/toolkit.py
@classmethod
def update_classes_filepath(cls,modelJSON):
    """
    Recursively adds the correct files path to all the classpath of the machine.

    Parameters
    ----------
    modelJSON : dictionary to process

    Returns
    -------

    """

    if isinstance(modelJSON, dict):
        new_dict = {}
        for k, v in modelJSON.items():
            if k == "classpath" and isinstance(v, str):
                _, fileData = cls.get_class_info(v)
                new_dict["classpath"] = v
                new_dict['filepath'] = fileData['filepath']
            elif isinstance(v, dict):
                new_dict[k] = cls.update_classes_filepath(v)
            elif isinstance(v, list):
                new_dict[k] = [cls.update_classes_filepath(v) if isinstance(item, dict) else item for item in v]
            else:
                new_dict[k] = v
        return new_dict
    else:
        return modelJSON