Hermes Workflow Toolkit¶
The hermesWorkflowToolkit is the database-backed workflow orchestrator that wraps the Hermes library. It manages the full lifecycle of simulation workflows: creating them from JSON, storing them in MongoDB with auto-generated names, building them into Luigi task DAGs, executing them, and comparing parameter variations.
OFToolkit inherits from hermesWorkflowToolkit, gaining workflow support automatically.
Architecture overview¶
Relationship to Hermes¶
The toolkit is a wrapper around the Hermes workflow engine:
| Layer | Class | Role |
|---|---|---|
| Hera toolkit | hermesWorkflowToolkit |
MongoDB storage, naming, retrieval, comparison, execution orchestration |
| Hermes workflow | hermes.workflow |
JSON → task DAG construction, parameter extraction, node management |
| Hermes engine | hermes.engines.luigi.builder |
Task DAG → Luigi Python module code generation |
| Hermes wrapper | hermes.taskwrapper.wrapper |
Wraps each node into a TaskWrapper with dependencies, parameters, and executer resolution |
| Luigi | luigi.Task |
Dependency-based execution engine with target-file state tracking |
Class hierarchy¶
abstractToolkit
↓
hermesWorkflowToolkit (hera/simulations/hermesWorkflowToolkit.py)
├─ OFToolkit (hera/simulations/openFoam/toolkit.py)
│ └─ adds OFObjectHome, Analysis, Presentation, solver extensions
└─ workflowToolkit [LSM variant] (hera/simulations/LSM/hermesWorkflowToolkit.py)
└─ imports hermes handlers (handler_build, handler_expand, handler_execute)
Toolkit extension pattern¶
Solver-specific functionality is added to OFToolkit via toolkit extensions — composition objects that hold a back-reference to the parent toolkit and provide solver-specific methods. This avoids putting all solver logic into a single class.
# In OFToolkit.__init__:
self.stochasticLagrangian = StochasticLagrangianSolver_toolkitExtension(self)
self.buoyantReactingFoam = buoyantReactingFoam_toolkitExtension(self)
# Usage:
toolkit.stochasticLagrangian.createDispersionFlowField(...)
toolkit.buoyantReactingFoam.IC_getHydrostaticPressure(...)
Each extension follows this pattern:
class absractEulerianSolver_toolkitExtension:
toolkit = None # back-reference to parent OFToolkit
analysis = None # optional analysis sub-layer
presentation = None # optional presentation sub-layer
def __init__(self, toolkit, solverName, incompressible):
self.toolkit = toolkit
self.solverName = solverName
self.incompressible = incompressible
Extension hierarchy:
absractEulerianSolver_toolkitExtension
├─ simpleFoam_toolkitExtension
└─ buoyantReactingFoam_toolkitExtension
absractStochasticLagrangianSolver_toolkitExtension
└─ StochasticLagrangianSolver_toolkitExtension
Extensions access the parent toolkit's data layer, workflow management, and mesh utilities via self.toolkit. This means they can query MongoDB, read fields, and manage workflows without duplicating any infrastructure:
# Inside an extension method:
docList = self.toolkit.getWorkflowListDocumentFromDB(flowName)
cellCenters = self.toolkit.getMesh(caseDirectory)
self.toolkit.OFObjectHome.readFieldFromCase(...)
Key extension methods:
| Extension | Method | Purpose |
|---|---|---|
stochasticLagrangian |
createDispersionFlowField() |
Set up dispersion case from base flow (time mapping, mesh linking) |
stochasticLagrangian |
createDispersionCaseDirectory() |
Validate DB consistency, manage directory conflicts |
stochasticLagrangian |
createAndLinkDispersionCaseDirectory() |
Copy system/constant, symlink processor meshes |
stochasticLagrangian |
writeParticlePositionFile() |
Generate source geometry for particle release |
buoyantReactingFoam |
IC_getHydrostaticPressure() |
Compute hydrostatic pressure field with boundary handling |
Key constants¶
DOCTYPE_WORKFLOW = "hermesWorkflow" # MongoDB document type
DESC_GROUPNAME = "groupName" # desc field keys
DESC_GROUPID = "groupID"
DESC_WORKFLOWNAME = "workflowName"
DESC_PARAMETERS = "parameters"
Workflow JSON structure¶
A workflow is a directed acyclic graph (DAG) of nodes, where each node represents a simulation step (copy files, generate mesh, run solver, etc.):
{
"workflow": {
"solver": "simpleFoam",
"root": "finalnode_xx",
"nodeList": ["blockMesh", "decomposePar", "simpleFoam", "reconstructPar"],
"nodes": {
"blockMesh": {
"type": "openFOAM.mesh.BlockMesh",
"Execution": {
"input_parameters": {
"vertices": [[0,0,0], [100,0,0], ...],
"cellCount": [50, 50, 20]
}
},
"requires": []
},
"simpleFoam": {
"type": "openFOAM.system.ControlDict",
"Execution": {
"input_parameters": {
"executeDir": "{blockMesh.output}",
"endTime": 1000
}
},
"requires": ["blockMesh", "decomposePar"]
}
}
}
}
Parameter referencing¶
Nodes reference outputs from other nodes using curly-brace syntax:
| Pattern | Meaning |
|---|---|
{nodeName.param} |
Output parameter from another node |
{workflow.param} |
Workflow-level parameter |
{WebGui.formData.field} |
GUI form data (Hermes workbench) |
These references are resolved by the TaskWrapper to build the dependency graph automatically.
Workflow lifecycle¶
1. Create: load workflow from JSON¶
# Dynamic class resolution based on solver field.
# If solver is null → hermes.workflow (generic)
# If solver is "simpleFoam" → hera.simulations.openFoam.OFWorkflow.workflow_simpleFoam
wf = toolkit.getHermesWorkflowFromJSON(
workflow="path/to/workflow.json", # or dict or JSON string
name="my_workflow",
resource="/path/to/case",
)
Internally, pydoc.locate() dynamically imports the workflow class based on the solver field. This allows solver-specific workflow subclasses to add custom node types and validation.
2. Add to database¶
doc = toolkit.addWorkflowToGroup(
workflowJSON="path/to/workflow.json",
groupName="dispersion",
writeWorkflowToFile=False,
resource=None, # auto-generated if None
)
# Creates document: dispersion_0001 (auto-incremented)
Idempotent add: The method first queries the DB by parameter content. If an identical workflow already exists, it returns the existing document instead of creating a duplicate.
What gets stored:
doc = self.addSimulationsDocument(
resource=resource,
dataFormat=datatypes.STRING,
type="hermesWorkflow",
desc={
"groupName": groupName,
"groupID": groupID,
"workflowName": workflowName, # e.g. "dispersion_0001"
"solver": hermesWF.solver,
"workflow": hermesWF.json, # full workflow JSON
"parameters": hermesWF.parametersJSON, # flattened for querying
}
)
The parameters are stored separately from the full workflow JSON to enable efficient MongoDB queries via dictToMongoQuery() without parsing the workflow tree.
3. Build: generate Luigi tasks¶
The build process:
hermes.workflow._buildNetwork()traverses the JSON and createsTaskWrapperobjects for each node- Each
TaskWrapperextracts its dependencies by scanning{node.param}references in input parameters LuigiBuilder.buildWorkflow()converts the task graph into a Python module containing Luigi Task classes- The generated module has one class per task (e.g.
blockMesh_0,simpleFoam_0) with: requires()→ returns upstream task instancesoutput()→ returnsluigi.LocalTargetfor state trackingrun()→ calls the Hermes executer for that node type
4. Execute: run Luigi pipeline¶
Execution steps:
- Retrieve workflow document from DB
- Reconstruct
hermes.workflowobject from stored JSON - Build Luigi task DAG (generates Python module)
- Write workflow JSON + Python module to
filesDirectory - Clean previous targets — remove
{name}_targetFiles/to force re-execution - Run Luigi via command line:
python3 -m luigi --module {name} finalnode_xx_0 --local-scheduler - Clean up generated Python module (JSON stays)
The finalnode_xx is a synthetic terminal node that depends on all actual nodes — executing it triggers the entire DAG.
5. Compare and retrieve¶
Cascading search — getWorkflowDocumentFromDB(input) tries four strategies in order:
- By name — exact match on
workflowName(e.g."dispersion_0001") - By resource — match by file/directory path
- By group name — returns all workflows in the group (e.g.
"dispersion") - By JSON content — parses input as JSON, extracts parameters, queries by flattened parameter values via
dictToMongoQuery()
Comparison:
# Compare all workflows in a group
diff = toolkit.compareWorkflowInGroup("dispersion")
# Returns DataFrame: rows = differing parameters, columns = workflow names
# Compare specific workflows
diff = toolkit.compareWorkflows(["dispersion_0001", "dispersion_0002"])
# List all workflows in a group
toolkit.listWorkflows("dispersion", listNodes=True, listParameters=True)
MongoDB storage¶
Document schema¶
{
"_id": "ObjectId",
"_cls": "Metadata.Simulations",
"projectName": "MY_PROJECT",
"type": "hermesWorkflow",
"resource": "/path/to/dispersion_0001",
"dataFormat": "string",
"desc": {
"groupName": "dispersion",
"groupID": 1,
"workflowName": "dispersion_0001",
"solver": "simpleFoam",
"workflow": { "workflow": { "nodeList": [...], "nodes": {...} } },
"parameters": {
"blockMesh": { "vertices": [...], "cellCount": [...] },
"simpleFoam": { "endTime": 1000 }
}
}
}
Querying¶
All queries filter by type="hermesWorkflow". Parameters are queried using dictToMongoQuery() which flattens nested dicts into MongoEngine's double-underscore notation:
Group naming convention¶
Format: <groupName>_<zero-padded-4-digit-id>
# Examples: dispersion_0001, flow_0042, lsm_0003
formatted_number = "{0:04d}".format(flowID)
return f"{baseName}_{formatted_number}"
Counter mechanism: Each group has its own counter stored in the project config via getCounterAndAdd(groupName). The counter auto-increments on each addWorkflowToGroup() call.
Parsing names:
LSM variant¶
Source: hera/simulations/LSM/hermesWorkflowToolkit.py
The LSM workflowToolkit extends the base with additional handler imports for the expand → build → execute pipeline:
| Handler | Purpose |
|---|---|
handler_expand |
Expand workflow templates with meteorological data |
handler_build |
Build the workflow into executable tasks |
handler_buildExecute |
Combined build + execute in one step |
handler_execute |
Execute a pre-built workflow |
This supports LSM's pattern where workflows are first expanded with site-specific meteorological data before being built and executed. The base toolkit only uses build → execute.
Key methods reference¶
Workflow creation¶
| Method | Purpose |
|---|---|
getHermesWorkflowFromJSON(workflow, name, resource) |
Create workflow object from JSON (file/dict/string) |
getHemresWorkflowFromDocument(documentList, returnFirst) |
Reconstruct workflow from DB document |
updateDocumentWorkflow(document, workflow) |
Update stored JSON + parameters |
Workflow storage¶
| Method | Purpose |
|---|---|
addWorkflowToGroup(workflowJSON, groupName, ...) |
Add workflow to DB with auto-naming |
addWorkflowFileInGroup(workflowFilePath, write_file) |
Add from file, infer group from filename |
findAvailableName(simulationGroup) |
Get next available ID + name |
getworkFlowName(baseName, flowID) |
Format <base>_<padded_id> |
splitWorkflowName(workflow_name) |
Parse <base>_<id> |
Workflow retrieval¶
| Method | Purpose |
|---|---|
getHermesWorkflowFromDB(input, returnFirst) |
Retrieve as hermes.workflow object |
getWorkflowDocumentFromDB(input, doctype, dockind) |
Retrieve raw document (cascading search) |
getWorkflowDocumentByName(name, doctype, dockind) |
Retrieve by exact name |
getWorkflowListDocumentFromDB(input) |
Retrieve as list of documents |
getWorkflowDocumentsInGroup(groupName) |
All documents in a group |
getWorkflowListOfSolvers(solverName) |
All documents for a solver |
Comparison and listing¶
| Method | Purpose |
|---|---|
compareWorkflows(Workflow, longFormat, transpose) |
Compare workflows by name or list |
compareWorkflowInGroup(workflowGroup) |
Compare all in a group |
compareWorkflowObj(workflowList) |
Compare workflow objects directly |
listWorkflows(workflowGroup, listNodes, listParameters) |
List workflows with optional details |
listGroups(solver, workflowName) |
List all groups in the project |
workflowTable(workflowGroup) |
Alias for compareWorkflowInGroup |
Execution and cleanup¶
| Method | Purpose |
|---|---|
executeWorkflowFromDB(input) |
Build + execute via Luigi |
deleteWorkflowInGroup(workflowGroup, deepDelete, resetCounter) |
Delete workflows, optionally remove files |
Templates¶
| Method | Purpose |
|---|---|
listHermesSolverTemplates(solverName) |
List loaded templates for a solver |
getHermesFlowTemplate(hermesFlowName) |
Get a flow template |
listHermesNodesTemplates() |
List all node templates |
getHermesNodeTemplate(hermesNodeName) |
Get a node template |
Cross-references¶
| What | Where |
|---|---|
| User guide | Toolkits > Simulations > Hermes Workflows |
| OpenFOAM toolkit (inherits from hermesWorkflowToolkit) | Simulations > OpenFOAM |
| LSM toolkit | Simulations > LSM |
| Hermes library | pyHermes/hermes/ (workflow, engines, taskwrapper) |
| API reference | API > Simulations |
| Workflow examples | Examples > Workflows |