Skip to content

Measurements Toolkits API

Toolkits for managing GIS data, meteorological observations, and experimental measurements.

GIS - Vector

BuildingsToolkit

hera.measurements.GIS.vector.buildings.toolkit.BuildingsToolkit

Bases: VectorToolkit

Toolkit to manage the buildings. Reading the shapefile with geoDataFrame will result in dataframe with the following columns: - Geometry: the polygon of the building. - Building height column : the column name is in BuildingHeightColumn. Default value=BLDG_HT. - Land height : the columns name is in LandHeightColumn. Default value=HT_LAND.

Source code in hera/measurements/GIS/vector/buildings/toolkit.py
class BuildingsToolkit(VectorToolkit):
    """
    Toolkit to manage the buildings. Reading the shapefile with geoDataFrame will result in dataframe
    with the following columns:
        - Geometry: the polygon of the building.
        - Building height column : the column name is in BuildingHeightColumn. Default value=BLDG_HT.
        - Land height  : the columns name is in LandHeightColumn. Default value=HT_LAND.
    """
    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        """Initialize the buildings toolkit.

        Parameters
        ----------
        projectName : str
            Name of the project.
        filesDirectory : str or None, optional
            Path to the files directory.
        connectionName : str or None, optional
            Database connection name.
        """
        super().__init__(projectName=projectName, toolkitName="Buildings", filesDirectory=filesDirectory,connectionName=connectionName)
        self._analysis = analysis(dataLayer=self)
        self._presentation = presentation(dataLayer=self, analysisLayer=self._analysis)

    def getBuildingHeightFromRasterTopographyToolkit(self, buildingData, topographyDataSource=None):
        """
        Get the topography height of each building (at its center) in the building data using the topography toolkit. Return data frame wtih 'evaluation' as a column.

        Parameters
        ----------
        buildingData : geopandas.geoDataFrame.
            The building.

        topographyDataSource : string,default=None.
            The name of the datasource in the topography toolkit. If None, use the default datasource there.

        Returns
        -------
            geopandas.DataFrame
        """
        topotk = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_RASTER_TOPOGRAPHY, projectName=self.projectName)
        elevations = topotk.getPointListElevation(buildingData.centroid.to_crs(WGS84))
        return buildingData.join(elevations)

    def buildingsGeopandasToSTLRasterTopography(self,
                                                buildingData,
                                                buildingHeightColumn,
                                                buildingElevationColumn,
                                                outputFileName,
                                                flatTerrain = False,
                                                referenceTopography = 0,
                                                nonFlatTopographyShift=10):
        """
        Converts a building data (in geopandas format) to STL using the FreeCAD module.
        Using the raster topography to estimate the height of each building.
        This is a low level procedure. It can be used, but the easier way to use the toolkit is to generate the buildings from an area using the regionToSTL procedure.
        We must save the file to the disk, as it is the current implementation of FreeCAD.

        Parameters
        ----------
        buildingData : geopandas.DataFrame
            The buildings data.

        buildingHeightColumn : string
            The name of the column that holds the height of the buildings in [m].

        buildingElevationColumn: string
            The name of the column that holds the elevation of the building.

        outputFileName : string
            The absolute path of the output STL.

        flatTerrain : bool
            If true, use a flat terrain.

        nonFlatTopographyShift : float
            Shift the house with respect to its height in the topography.

        referenceTopography : float [default 0]
            If flatTerrain, use this as the reference height for the buildings.

        Returns
        -------

        """
        logger = get_classMethod_logger(self, "geoPandasToSTL")
        logger.info(f"Converting {len(buildingData)} to STL. Using {'flat' if flatTerrain else 'topography'} settings")

        try:
            FreeCADDOC = FreeCAD.newDocument("Unnamed")
        except:
            err = "FreeCAD not found. Install before using this function and add to the PYTHONPATH"
            raise ValueError(err)

        for indx, building in buildingData.iterrows():  # converting al the buildings

            try:
                walls = building['geometry'].exterior.xy
                walls = numpy.stack(walls).T
            except:
                continue

            if indx % 100 == 0:
                logger.debug(f"{indx}/{len(buildingData)} shape file is executed")

            wallsheight = building[buildingHeightColumn]
            altitude = referenceTopography if flatTerrain else building[buildingElevationColumn] - nonFlatTopographyShift
            logger.debug(f" Building -- {indx} --")
            logger.debug("newSketch = FreeCADDOC.addObject('Sketcher::SketchObject', 'Sketch" + str(indx) + "')")
            newSketch = FreeCADDOC.addObject('Sketcher::SketchObject', 'Sketch' + str(indx))

            logger.debug(
                f"newSketch.Placement = FreeCAD.Placement(FreeCAD.Vector(0.000000, 0.000000, {altitude}), FreeCAD.Rotation(0.000000, 0.000000, 0.000000, 1.000000))")
            newSketch.Placement = FreeCAD.Placement(FreeCAD.Vector(0.000000, 0.000000, altitude),  # 2*k-1
                                                    FreeCAD.Rotation(0.000000, 0.000000, 0.000000, 1.000000))

            for xy0, xy1 in zip(walls[:-1], walls[1:]):
                logger.debug(
                    f"newSketch.addGeometry(Part.LineSegment(FreeCAD.Vector({xy0[0]}, {xy0[1]}, {altitude}),FreeCAD.Vector({xy1[0]}, {xy1[1]}, {altitude})))")
                newSketch.addGeometry(
                    Part.LineSegment(FreeCAD.Vector(xy0[0], xy0[1], altitude),
                                     FreeCAD.Vector(xy1[0], xy1[1], altitude)))

            logger.debug("FreeCADDOC.addObject('Part::Extrusion', 'building" + str(indx) + "')")
            newPad = FreeCADDOC.addObject("Part::Extrusion", "building" + str(indx))
            logger.debug("newPad.Base = newSketch")
            newPad.Base = newSketch
            buildingTopAltitude = wallsheight + nonFlatTopographyShift # the paddign is from the bottom of the buildings, which is nonFlatTopographyShift lower.
            logger.debug(f"newPad.LengthFwd = {buildingTopAltitude}")
            newPad.LengthFwd = buildingTopAltitude
            logger.debug("newPad.Solid = True")
            newPad.Solid = True
            logger.debug("newPad.Symmetric = False")
            newPad.Symmetric = False
            FreeCADDOC.recompute()

        logger.info(f"Writing the STL {outputFileName}")
        Mesh.export(FreeCADDOC.Objects, outputFileName)


    def getBuildingsFromRectangle(self, minx, miny, maxx, maxy, dataSourceName=None, inputCRS=WGS84, withElevation=False):
        """
        Return the buildings geopandas for the rectangle region.

        Parameters
        ----------
        minx: float
            Minimum value of x-axis.

        miny: float
            Minimum value of y-axis.

        maxx: float
            Maximum value of x-axis.

        may: float
            Maximum value of y-axis.

        dataSourceName: str,default=None.
            The datasource name. If none, will use the default datasource.

        withElevation : bool,default=False.
            If True, use the topography (raster) toolkit to get the heights.

        Returns
        -------
            geopandas.DataFrame
        """
        if dataSourceName is None:
            dataSourceName = self.getConfig()["defaultBuildingDataSource"]

        doc = self.getDataSourceDocument(dataSourceName)
        buildings = self.cutRegionFromSource(doc, shape=[minx, miny, maxx, maxy], inputCRS=inputCRS)

        if withElevation:
            buildings = self.getBuildingHeightFromRasterTopographyToolkit(buildings)

        return buildings
    @staticmethod
    def get_buildings_height(gdf):
        """
        Extract building names, geometries(coordination), and height information from a GeoDataFrame.
        if there is no height available - it will calculate the number of levels in the building * 3/
        else: none

        Parameters:
        gdf (GeoDataFrame): A GeoPandas DataFrame containing building geometries
                            and associated properties.

        Returns:
        geopandas.DataFrame

        """
        building_info = []

        for index, row in gdf.iterrows():
            name = row.get('name', 'Unnamed')  # Extract the building name

            # Retain the original geometry
            geometry = row.geometry

            # Try to get the height or number of levels
            height = row.get('height')
            levels = row.get('building:levels')

            # Determine the height value
            if height is None and levels is not None:
                height = levels * 3  # Estimate height based on number of levels

            # Append the building information
            building_info.append({
                'name': name,
                'geometry': geometry,  # Keep the original geometry
                'height': height
            })

        return gpd.GeoDataFrame(building_info)
    @staticmethod
    def filter_buildings_in_area(buildings_data, min_longitude, min_latitude, max_longitude, max_latitude):
        """
        Filter building features by a specified geographic area using a bounding box.

        Parameters:
        ----------
        buildings_data : dict
            A GeoJSON-like dictionary containing building features to filter.

        min_longitude : float
            Minimum longitude defining the bounding box.

        min_latitude : float
            Minimum latitude defining the bounding box.

        max_longitude : float
            Maximum longitude defining the bounding box.

        max_latitude : float
            Maximum latitude defining the bounding box.

        Returns:
        -------
        gpd.GeoDataFrame
            A GeoDataFrame containing buildings that are located within the specified area.
        """
        # Create a polygon from the bounding box
        bbox_polygon = Polygon([(min_longitude, min_latitude),
                                (max_longitude, min_latitude),
                                (max_longitude, max_latitude),
                                (min_longitude, max_latitude),
                                (min_longitude, min_latitude)])  # Close the polygon

        # List to hold filtered building data
        filtered_buildings = []

        for feature in buildings_data['features']:
            geometry = feature['geometry']
            properties = feature['properties']

            # Create a geometry shape from the geometry data
            geom = shape(geometry)  # Handles Point, Polygon, MultiPolygon, etc.

            # Check if the geometry intersects with the bounding box polygon
            if geom.intersects(bbox_polygon):
                # Add all properties and the geometry
                properties['geometry'] = geom
                filtered_buildings.append(properties)

        # Create a GeoDataFrame
        gdf = gpd.GeoDataFrame(filtered_buildings)

        return gdf

__init__(projectName, filesDirectory=None, connectionName=None)

Initialize the buildings toolkit.

Parameters:

Name Type Description Default
projectName str

Name of the project.

required
filesDirectory str or None

Path to the files directory.

None
connectionName str or None

Database connection name.

None
Source code in hera/measurements/GIS/vector/buildings/toolkit.py
def __init__(self, projectName, filesDirectory=None,connectionName=None):
    """Initialize the buildings toolkit.

    Parameters
    ----------
    projectName : str
        Name of the project.
    filesDirectory : str or None, optional
        Path to the files directory.
    connectionName : str or None, optional
        Database connection name.
    """
    super().__init__(projectName=projectName, toolkitName="Buildings", filesDirectory=filesDirectory,connectionName=connectionName)
    self._analysis = analysis(dataLayer=self)
    self._presentation = presentation(dataLayer=self, analysisLayer=self._analysis)

getBuildingHeightFromRasterTopographyToolkit(buildingData, topographyDataSource=None)

Get the topography height of each building (at its center) in the building data using the topography toolkit. Return data frame wtih 'evaluation' as a column.

Parameters:

Name Type Description Default
buildingData geopandas.geoDataFrame.

The building.

required
topographyDataSource string,default=None.

The name of the datasource in the topography toolkit. If None, use the default datasource there.

None

Returns:

Type Description
geopandas.DataFrame
Source code in hera/measurements/GIS/vector/buildings/toolkit.py
def getBuildingHeightFromRasterTopographyToolkit(self, buildingData, topographyDataSource=None):
    """
    Get the topography height of each building (at its center) in the building data using the topography toolkit. Return data frame wtih 'evaluation' as a column.

    Parameters
    ----------
    buildingData : geopandas.geoDataFrame.
        The building.

    topographyDataSource : string,default=None.
        The name of the datasource in the topography toolkit. If None, use the default datasource there.

    Returns
    -------
        geopandas.DataFrame
    """
    topotk = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_RASTER_TOPOGRAPHY, projectName=self.projectName)
    elevations = topotk.getPointListElevation(buildingData.centroid.to_crs(WGS84))
    return buildingData.join(elevations)

buildingsGeopandasToSTLRasterTopography(buildingData, buildingHeightColumn, buildingElevationColumn, outputFileName, flatTerrain=False, referenceTopography=0, nonFlatTopographyShift=10)

Converts a building data (in geopandas format) to STL using the FreeCAD module. Using the raster topography to estimate the height of each building. This is a low level procedure. It can be used, but the easier way to use the toolkit is to generate the buildings from an area using the regionToSTL procedure. We must save the file to the disk, as it is the current implementation of FreeCAD.

Parameters:

Name Type Description Default
buildingData DataFrame

The buildings data.

required
buildingHeightColumn string

The name of the column that holds the height of the buildings in [m].

required
buildingElevationColumn

The name of the column that holds the elevation of the building.

required
outputFileName string

The absolute path of the output STL.

required
flatTerrain bool

If true, use a flat terrain.

False
nonFlatTopographyShift float

Shift the house with respect to its height in the topography.

10
referenceTopography float [default 0]

If flatTerrain, use this as the reference height for the buildings.

0
Source code in hera/measurements/GIS/vector/buildings/toolkit.py
def buildingsGeopandasToSTLRasterTopography(self,
                                            buildingData,
                                            buildingHeightColumn,
                                            buildingElevationColumn,
                                            outputFileName,
                                            flatTerrain = False,
                                            referenceTopography = 0,
                                            nonFlatTopographyShift=10):
    """
    Converts a building data (in geopandas format) to STL using the FreeCAD module.
    Using the raster topography to estimate the height of each building.
    This is a low level procedure. It can be used, but the easier way to use the toolkit is to generate the buildings from an area using the regionToSTL procedure.
    We must save the file to the disk, as it is the current implementation of FreeCAD.

    Parameters
    ----------
    buildingData : geopandas.DataFrame
        The buildings data.

    buildingHeightColumn : string
        The name of the column that holds the height of the buildings in [m].

    buildingElevationColumn: string
        The name of the column that holds the elevation of the building.

    outputFileName : string
        The absolute path of the output STL.

    flatTerrain : bool
        If true, use a flat terrain.

    nonFlatTopographyShift : float
        Shift the house with respect to its height in the topography.

    referenceTopography : float [default 0]
        If flatTerrain, use this as the reference height for the buildings.

    Returns
    -------

    """
    logger = get_classMethod_logger(self, "geoPandasToSTL")
    logger.info(f"Converting {len(buildingData)} to STL. Using {'flat' if flatTerrain else 'topography'} settings")

    try:
        FreeCADDOC = FreeCAD.newDocument("Unnamed")
    except:
        err = "FreeCAD not found. Install before using this function and add to the PYTHONPATH"
        raise ValueError(err)

    for indx, building in buildingData.iterrows():  # converting al the buildings

        try:
            walls = building['geometry'].exterior.xy
            walls = numpy.stack(walls).T
        except:
            continue

        if indx % 100 == 0:
            logger.debug(f"{indx}/{len(buildingData)} shape file is executed")

        wallsheight = building[buildingHeightColumn]
        altitude = referenceTopography if flatTerrain else building[buildingElevationColumn] - nonFlatTopographyShift
        logger.debug(f" Building -- {indx} --")
        logger.debug("newSketch = FreeCADDOC.addObject('Sketcher::SketchObject', 'Sketch" + str(indx) + "')")
        newSketch = FreeCADDOC.addObject('Sketcher::SketchObject', 'Sketch' + str(indx))

        logger.debug(
            f"newSketch.Placement = FreeCAD.Placement(FreeCAD.Vector(0.000000, 0.000000, {altitude}), FreeCAD.Rotation(0.000000, 0.000000, 0.000000, 1.000000))")
        newSketch.Placement = FreeCAD.Placement(FreeCAD.Vector(0.000000, 0.000000, altitude),  # 2*k-1
                                                FreeCAD.Rotation(0.000000, 0.000000, 0.000000, 1.000000))

        for xy0, xy1 in zip(walls[:-1], walls[1:]):
            logger.debug(
                f"newSketch.addGeometry(Part.LineSegment(FreeCAD.Vector({xy0[0]}, {xy0[1]}, {altitude}),FreeCAD.Vector({xy1[0]}, {xy1[1]}, {altitude})))")
            newSketch.addGeometry(
                Part.LineSegment(FreeCAD.Vector(xy0[0], xy0[1], altitude),
                                 FreeCAD.Vector(xy1[0], xy1[1], altitude)))

        logger.debug("FreeCADDOC.addObject('Part::Extrusion', 'building" + str(indx) + "')")
        newPad = FreeCADDOC.addObject("Part::Extrusion", "building" + str(indx))
        logger.debug("newPad.Base = newSketch")
        newPad.Base = newSketch
        buildingTopAltitude = wallsheight + nonFlatTopographyShift # the paddign is from the bottom of the buildings, which is nonFlatTopographyShift lower.
        logger.debug(f"newPad.LengthFwd = {buildingTopAltitude}")
        newPad.LengthFwd = buildingTopAltitude
        logger.debug("newPad.Solid = True")
        newPad.Solid = True
        logger.debug("newPad.Symmetric = False")
        newPad.Symmetric = False
        FreeCADDOC.recompute()

    logger.info(f"Writing the STL {outputFileName}")
    Mesh.export(FreeCADDOC.Objects, outputFileName)

getBuildingsFromRectangle(minx, miny, maxx, maxy, dataSourceName=None, inputCRS=WGS84, withElevation=False)

Return the buildings geopandas for the rectangle region.

Parameters:

Name Type Description Default
minx

Minimum value of x-axis.

required
miny

Minimum value of y-axis.

required
maxx

Maximum value of x-axis.

required
may

Maximum value of y-axis.

required
dataSourceName

The datasource name. If none, will use the default datasource.

None
withElevation bool,default=False.

If True, use the topography (raster) toolkit to get the heights.

False

Returns:

Type Description
geopandas.DataFrame
Source code in hera/measurements/GIS/vector/buildings/toolkit.py
def getBuildingsFromRectangle(self, minx, miny, maxx, maxy, dataSourceName=None, inputCRS=WGS84, withElevation=False):
    """
    Return the buildings geopandas for the rectangle region.

    Parameters
    ----------
    minx: float
        Minimum value of x-axis.

    miny: float
        Minimum value of y-axis.

    maxx: float
        Maximum value of x-axis.

    may: float
        Maximum value of y-axis.

    dataSourceName: str,default=None.
        The datasource name. If none, will use the default datasource.

    withElevation : bool,default=False.
        If True, use the topography (raster) toolkit to get the heights.

    Returns
    -------
        geopandas.DataFrame
    """
    if dataSourceName is None:
        dataSourceName = self.getConfig()["defaultBuildingDataSource"]

    doc = self.getDataSourceDocument(dataSourceName)
    buildings = self.cutRegionFromSource(doc, shape=[minx, miny, maxx, maxy], inputCRS=inputCRS)

    if withElevation:
        buildings = self.getBuildingHeightFromRasterTopographyToolkit(buildings)

    return buildings

get_buildings_height(gdf) staticmethod

Extract building names, geometries(coordination), and height information from a GeoDataFrame. if there is no height available - it will calculate the number of levels in the building * 3/ else: none

Parameters: gdf (GeoDataFrame): A GeoPandas DataFrame containing building geometries and associated properties.

Returns: geopandas.DataFrame

Source code in hera/measurements/GIS/vector/buildings/toolkit.py
@staticmethod
def get_buildings_height(gdf):
    """
    Extract building names, geometries(coordination), and height information from a GeoDataFrame.
    if there is no height available - it will calculate the number of levels in the building * 3/
    else: none

    Parameters:
    gdf (GeoDataFrame): A GeoPandas DataFrame containing building geometries
                        and associated properties.

    Returns:
    geopandas.DataFrame

    """
    building_info = []

    for index, row in gdf.iterrows():
        name = row.get('name', 'Unnamed')  # Extract the building name

        # Retain the original geometry
        geometry = row.geometry

        # Try to get the height or number of levels
        height = row.get('height')
        levels = row.get('building:levels')

        # Determine the height value
        if height is None and levels is not None:
            height = levels * 3  # Estimate height based on number of levels

        # Append the building information
        building_info.append({
            'name': name,
            'geometry': geometry,  # Keep the original geometry
            'height': height
        })

    return gpd.GeoDataFrame(building_info)

filter_buildings_in_area(buildings_data, min_longitude, min_latitude, max_longitude, max_latitude) staticmethod

Filter building features by a specified geographic area using a bounding box.

Parameters:

buildings_data : dict A GeoJSON-like dictionary containing building features to filter.

min_longitude : float Minimum longitude defining the bounding box.

min_latitude : float Minimum latitude defining the bounding box.

max_longitude : float Maximum longitude defining the bounding box.

max_latitude : float Maximum latitude defining the bounding box.

Returns:

gpd.GeoDataFrame A GeoDataFrame containing buildings that are located within the specified area.

Source code in hera/measurements/GIS/vector/buildings/toolkit.py
@staticmethod
def filter_buildings_in_area(buildings_data, min_longitude, min_latitude, max_longitude, max_latitude):
    """
    Filter building features by a specified geographic area using a bounding box.

    Parameters:
    ----------
    buildings_data : dict
        A GeoJSON-like dictionary containing building features to filter.

    min_longitude : float
        Minimum longitude defining the bounding box.

    min_latitude : float
        Minimum latitude defining the bounding box.

    max_longitude : float
        Maximum longitude defining the bounding box.

    max_latitude : float
        Maximum latitude defining the bounding box.

    Returns:
    -------
    gpd.GeoDataFrame
        A GeoDataFrame containing buildings that are located within the specified area.
    """
    # Create a polygon from the bounding box
    bbox_polygon = Polygon([(min_longitude, min_latitude),
                            (max_longitude, min_latitude),
                            (max_longitude, max_latitude),
                            (min_longitude, max_latitude),
                            (min_longitude, min_latitude)])  # Close the polygon

    # List to hold filtered building data
    filtered_buildings = []

    for feature in buildings_data['features']:
        geometry = feature['geometry']
        properties = feature['properties']

        # Create a geometry shape from the geometry data
        geom = shape(geometry)  # Handles Point, Polygon, MultiPolygon, etc.

        # Check if the geometry intersects with the bounding box polygon
        if geom.intersects(bbox_polygon):
            # Add all properties and the geometry
            properties['geometry'] = geom
            filtered_buildings.append(properties)

    # Create a GeoDataFrame
    gdf = gpd.GeoDataFrame(filtered_buildings)

    return gdf

TopographyToolkit (Vector)

hera.measurements.GIS.vector.topography.TopographyToolkit

Bases: VectorToolkit

Toolkit for managing and analyzing vector topography data.

Source code in hera/measurements/GIS/vector/topography.py
class TopographyToolkit(VectorToolkit):
    """Toolkit for managing and analyzing vector topography data."""

    @property
    def stlFactory(self):
        """stlFactory : stlFactory
            The STL factory instance used for vector-to-STL conversions."""
        return self._stlFactory

    def __init__(self, projectName, filesDirectory="",connectionName=None):
        """Initialize the topography toolkit with project settings and analysis layer.

        Parameters
        ----------
        projectName : str
            Name of the project.
        filesDirectory : str, optional
            Path to the files directory.
        connectionName : str or None, optional
            Database connection name.
        """
        toolkitName = "Topography"
        super().__init__(projectName=projectName, filesDirectory=filesDirectory, toolkitName=toolkitName,connectionName=connectionName)
        self._analysis = analysis(projectName=projectName, dataLayer=self)
        self._toolkitname = toolkitName
        self._stlFactory = stlFactory()


    def cutRegionFromSource(self, shapeDataOrName, datasourceName, isBounds = False, crs = None): # If  shapeDataOrName is data: if is Bounds = True: use the Bbox of shape as the region, else use the shpae as the region
        """
            Cuts a the shape from the requested datasource

            It overrides the parent procedure because we need to perform intersection operation on the results.

        Parameters
        ----------
        shapeDataOrName: GeoDataFrame,dict,list,Polygon
            The shape data to use.

                        list - a box with the corners in [xmin,ymin,xmax,ymax]

        datasourceName : str
            The name of the satasource to cur from.

        isBounds : bool
            If true, use the bounding box fo the polygon.

        crs : int
            The EPSG of the coordinate system of the shape (if it is a shape and not in the dtasource ative coordinates).

        Returns
        -------

        """
        topography = super().cutRegionFromSource(shapeDataOrName=shapeDataOrName, datasourceName=datasourceName, isBounds =isBounds, crs =crs)
        logger = get_classMethod_logger(self, "cutRegionFromSource")
        if isinstance(shapeDataOrName, str):
            shape = self.getRegionData(shapeDataOrName)
        else:
            shape = self._RegionToGeopandas(shapeDataOrName, crs=crs)
            doc = self.getDatasourceDocument(datasourceName=datasourceName)
            logger.debug(f"The datasource {datasourceName} is pointing to {doc.resource}")
            doc.desc['desc'].update({'crs': 2039})
            if 'crs' not in doc.desc['desc']:
                logger.error(f"The datasource {datasourceName} has no CRS defined in the metadata. please add it")
                raise ValueError(f"The datasource {datasourceName} has no CRS defined in the metadata. please add it")

            if shape.crs is None:
                logger.debug("The region was defined without crs. Using the crs of the datasource.")
                shape.crs = doc.desc['desc']['crs']
                shape = shape.to_crs(topography.crs)

        topography['geometry'] = topography.intersection(shape.iloc[0].geometry)
        return topography



    def geoPandasToSTL(self,gpandas, dxdy=50, solidName="Topography"):
        """
            Transforsm the gpandas to STL.

        Parameters
        ----------
        gpandas
        dxdy
        solidName

        Returns
        -------

        """
        return self.stlFactory.vectorToSTL(gpandas, dxdy=dxdy, solidName="Topography")

    def regionToSTL(self, shapeDataOrName, dxdy, datasourceName, crs=None):
        """
            Converts a region in a vector height map (contours) to STL at requested resolution

            We always use the bounding box of the input shape .

        Parameters:
        -----------

            shapeDataOrName: str or polygon
                The shape that we will use.

            dxdy: float.
                the dimension of each cell in the mesh in meters

            dataSourceName: str
                The datasource to use.

            crs : int [optional]
                Used if the CRS of the shapeData is different than the CRS of the datasource.

        Returns
        -------
            str
            The STL string.

        """
        logger = get_classMethod_logger(self, "regionToSTL")
        logger.info("-- Start --")

        topography = self.cutRegionFromSource(shapeDataOrName, datasourceName=datasourceName, isBounds=True, crs=crs)


        if len(topography) == 0:
            logger.warning("The requested region is empty. ")
            stlstr = None
        else:
            stlstr = self.stlFactory.vectorToSTL(topography,dxdy=dxdy)

        logger.info("-- End --")

        return stlstr

    def toDEM(self,regionNameOrData,dxdy=50,**filters):

        """
            Creates a Data Elevation Model (DEM) format string of topography.

            Parameters
            ----------

            regionNameOrData: str, geoJSON string, or geodataframe.
                The name of the regions, geoJSON string or geodataframe.

            dxdy: float
                The resolution of the conversion .

            Returns
            -------

                The DEM string format.
        """

        if isinstance(regionNameOrData,str):
            data = self.getDatasourceData(datasourceName=regionNameOrData,**filters)
            if data is None:
                data = geopandas.read_file(io.StringIO(regionNameOrData))
        elif isinstance(regionNameOrData,geopandas.geodataframe.GeoDataFrame):
            data = regionNameOrData
        else:
            raise ValueError("regionNameOrData must be wither region name in the DB, geoJSON string or a geopandas.geodataframe")

        rasterized = self.stlFactory.rasterizeGeopandas(data, dxdy=dxdy)

        if numpy.ma.is_masked(rasterized['height']):
            nans = rasterized['height'].mask
        else:
            nans = numpy.isnan(rasterized['height'])

        notnans = numpy.logical_not(nans)

        grid_x = rasterized['x']
        grid_y = rasterized['y']

        xmin = grid_x.min()
        xmax = grid_x.max()
        ymin = grid_y.min()
        ymax = grid_y.max()
        Nx   = grid_x.shape[0]
        Ny   = grid_x.shape[1]


        rasterized['height'][nans] = scipy.interpolate.griddata((grid_x[notnans], grid_y[notnans]), rasterized['height'][notnans],
                                             (grid_x[nans], grid_y[nans]), method='nearest').ravel()

        DEMstring = f"{xmin} {xmax} {ymin} {ymax}\n{Nx} {Ny}\n"

        for i in range(Nx):
            for j in range(Ny):
                DEMstring += f"{(float(rasterized['height'][j, i]))} "
            DEMstring += "\n"

        return DEMstring

stlFactory property

stlFactory : stlFactory The STL factory instance used for vector-to-STL conversions.

__init__(projectName, filesDirectory='', connectionName=None)

Initialize the topography toolkit with project settings and analysis layer.

Parameters:

Name Type Description Default
projectName str

Name of the project.

required
filesDirectory str

Path to the files directory.

''
connectionName str or None

Database connection name.

None
Source code in hera/measurements/GIS/vector/topography.py
def __init__(self, projectName, filesDirectory="",connectionName=None):
    """Initialize the topography toolkit with project settings and analysis layer.

    Parameters
    ----------
    projectName : str
        Name of the project.
    filesDirectory : str, optional
        Path to the files directory.
    connectionName : str or None, optional
        Database connection name.
    """
    toolkitName = "Topography"
    super().__init__(projectName=projectName, filesDirectory=filesDirectory, toolkitName=toolkitName,connectionName=connectionName)
    self._analysis = analysis(projectName=projectName, dataLayer=self)
    self._toolkitname = toolkitName
    self._stlFactory = stlFactory()

cutRegionFromSource(shapeDataOrName, datasourceName, isBounds=False, crs=None)

Cuts a the shape from the requested datasource

It overrides the parent procedure because we need to perform intersection operation on the results.

Parameters:

Name Type Description Default
shapeDataOrName

The shape data to use.

        list - a box with the corners in [xmin,ymin,xmax,ymax]
required
datasourceName str

The name of the satasource to cur from.

required
isBounds bool

If true, use the bounding box fo the polygon.

False
crs int

The EPSG of the coordinate system of the shape (if it is a shape and not in the dtasource ative coordinates).

None
Source code in hera/measurements/GIS/vector/topography.py
def cutRegionFromSource(self, shapeDataOrName, datasourceName, isBounds = False, crs = None): # If  shapeDataOrName is data: if is Bounds = True: use the Bbox of shape as the region, else use the shpae as the region
    """
        Cuts a the shape from the requested datasource

        It overrides the parent procedure because we need to perform intersection operation on the results.

    Parameters
    ----------
    shapeDataOrName: GeoDataFrame,dict,list,Polygon
        The shape data to use.

                    list - a box with the corners in [xmin,ymin,xmax,ymax]

    datasourceName : str
        The name of the satasource to cur from.

    isBounds : bool
        If true, use the bounding box fo the polygon.

    crs : int
        The EPSG of the coordinate system of the shape (if it is a shape and not in the dtasource ative coordinates).

    Returns
    -------

    """
    topography = super().cutRegionFromSource(shapeDataOrName=shapeDataOrName, datasourceName=datasourceName, isBounds =isBounds, crs =crs)
    logger = get_classMethod_logger(self, "cutRegionFromSource")
    if isinstance(shapeDataOrName, str):
        shape = self.getRegionData(shapeDataOrName)
    else:
        shape = self._RegionToGeopandas(shapeDataOrName, crs=crs)
        doc = self.getDatasourceDocument(datasourceName=datasourceName)
        logger.debug(f"The datasource {datasourceName} is pointing to {doc.resource}")
        doc.desc['desc'].update({'crs': 2039})
        if 'crs' not in doc.desc['desc']:
            logger.error(f"The datasource {datasourceName} has no CRS defined in the metadata. please add it")
            raise ValueError(f"The datasource {datasourceName} has no CRS defined in the metadata. please add it")

        if shape.crs is None:
            logger.debug("The region was defined without crs. Using the crs of the datasource.")
            shape.crs = doc.desc['desc']['crs']
            shape = shape.to_crs(topography.crs)

    topography['geometry'] = topography.intersection(shape.iloc[0].geometry)
    return topography

geoPandasToSTL(gpandas, dxdy=50, solidName='Topography')

Transforsm the gpandas to STL.

Parameters:

Name Type Description Default
gpandas
required
dxdy
50
solidName
'Topography'
Source code in hera/measurements/GIS/vector/topography.py
def geoPandasToSTL(self,gpandas, dxdy=50, solidName="Topography"):
    """
        Transforsm the gpandas to STL.

    Parameters
    ----------
    gpandas
    dxdy
    solidName

    Returns
    -------

    """
    return self.stlFactory.vectorToSTL(gpandas, dxdy=dxdy, solidName="Topography")

regionToSTL(shapeDataOrName, dxdy, datasourceName, crs=None)

Converts a region in a vector height map (contours) to STL at requested resolution

We always use the bounding box of the input shape .
Parameters:
shapeDataOrName: str or polygon
    The shape that we will use.

dxdy: float.
    the dimension of each cell in the mesh in meters

dataSourceName: str
    The datasource to use.

crs : int [optional]
    Used if the CRS of the shapeData is different than the CRS of the datasource.

Returns:

Type Description
str

The STL string.

Source code in hera/measurements/GIS/vector/topography.py
def regionToSTL(self, shapeDataOrName, dxdy, datasourceName, crs=None):
    """
        Converts a region in a vector height map (contours) to STL at requested resolution

        We always use the bounding box of the input shape .

    Parameters:
    -----------

        shapeDataOrName: str or polygon
            The shape that we will use.

        dxdy: float.
            the dimension of each cell in the mesh in meters

        dataSourceName: str
            The datasource to use.

        crs : int [optional]
            Used if the CRS of the shapeData is different than the CRS of the datasource.

    Returns
    -------
        str
        The STL string.

    """
    logger = get_classMethod_logger(self, "regionToSTL")
    logger.info("-- Start --")

    topography = self.cutRegionFromSource(shapeDataOrName, datasourceName=datasourceName, isBounds=True, crs=crs)


    if len(topography) == 0:
        logger.warning("The requested region is empty. ")
        stlstr = None
    else:
        stlstr = self.stlFactory.vectorToSTL(topography,dxdy=dxdy)

    logger.info("-- End --")

    return stlstr

toDEM(regionNameOrData, dxdy=50, **filters)

Creates a Data Elevation Model (DEM) format string of topography.

Parameters:

Name Type Description Default
regionNameOrData

The name of the regions, geoJSON string or geodataframe.

required
dxdy

The resolution of the conversion .

50

Returns:

Type Description
The DEM string format.
Source code in hera/measurements/GIS/vector/topography.py
def toDEM(self,regionNameOrData,dxdy=50,**filters):

    """
        Creates a Data Elevation Model (DEM) format string of topography.

        Parameters
        ----------

        regionNameOrData: str, geoJSON string, or geodataframe.
            The name of the regions, geoJSON string or geodataframe.

        dxdy: float
            The resolution of the conversion .

        Returns
        -------

            The DEM string format.
    """

    if isinstance(regionNameOrData,str):
        data = self.getDatasourceData(datasourceName=regionNameOrData,**filters)
        if data is None:
            data = geopandas.read_file(io.StringIO(regionNameOrData))
    elif isinstance(regionNameOrData,geopandas.geodataframe.GeoDataFrame):
        data = regionNameOrData
    else:
        raise ValueError("regionNameOrData must be wither region name in the DB, geoJSON string or a geopandas.geodataframe")

    rasterized = self.stlFactory.rasterizeGeopandas(data, dxdy=dxdy)

    if numpy.ma.is_masked(rasterized['height']):
        nans = rasterized['height'].mask
    else:
        nans = numpy.isnan(rasterized['height'])

    notnans = numpy.logical_not(nans)

    grid_x = rasterized['x']
    grid_y = rasterized['y']

    xmin = grid_x.min()
    xmax = grid_x.max()
    ymin = grid_y.min()
    ymax = grid_y.max()
    Nx   = grid_x.shape[0]
    Ny   = grid_x.shape[1]


    rasterized['height'][nans] = scipy.interpolate.griddata((grid_x[notnans], grid_y[notnans]), rasterized['height'][notnans],
                                         (grid_x[nans], grid_y[nans]), method='nearest').ravel()

    DEMstring = f"{xmin} {xmax} {ymin} {ymax}\n{Nx} {Ny}\n"

    for i in range(Nx):
        for j in range(Ny):
            DEMstring += f"{(float(rasterized['height'][j, i]))} "
        DEMstring += "\n"

    return DEMstring

DemographyToolkit

hera.measurements.GIS.vector.demography.DemographyToolkit

Bases: VectorToolkit

A toolkit to manage demography data

Source code in hera/measurements/GIS/vector/demography.py
class DemographyToolkit(toolkit.VectorToolkit):
    """
        A toolkit to manage demography data

    """

    _populationTypes = None # A dictionary with the names of the population types.

    _buildings = None
    _analysis = None
    _presentation = None

    @property
    def buildings(self):
        """BuildingsToolkit : The buildings toolkit instance."""
        return self._buildings

    @property
    def shapes(self):
        """ShapesToolKit : The shapes toolkit instance."""
        return self._shapes

    @property
    def analysis(self):
        """analysis : The demography analysis layer."""
        return self._analysis

    @property
    def presentation(self):
        """presentation : The demography presentation layer."""
        return self._presentation

    @property
    def populationTypes(self):
        """dict : Mapping of population group names to column names."""
        return self._populationTypes

    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        """
            Initializes the demography tool.

        Parameters
        ----------
        projectName: str
            The project name

        SourceOrData: str or geopandas or None.
            None:   Try to load the default data source. If does not exist, set data to None.
            str:    The name of the source in the DB that will be used as a data.
            geoDataFrame: Use this as the demography data. See documentation of the structure of the demographic dataframe.

        dataSourceVersion : tuple of integers
            If specified load this version of the data.

        """
        self._projectName = projectName
        super().__init__(projectName=projectName, toolkitName="Demography", filesDirectory=filesDirectory,connectionName=connectionName)
        self._analysis = analysis(self)
        self._presentation = presentation(self)

        self._populationTypes = {"All":"total_pop","Children":"age_0_14","Youth":"age_15_19",
                           "YoungAdults":"age_20_29","Adults":"age_30_64","Elderly":"age_65_up"}

        self._buildings = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_BUILDINGS,projectName=projectName)

    def setDefaultDirectory(self, fileDirectory, create=True):
        """
            Set the default directory for the project.

        Parameters
        ----------
        fileDirectory: str
                The path to save the regions in.
                The directory is created if create flag is true (and directory does not exist).

        create: bool
            If false and directory does not exist, raise a NotADirectoryError exception.

        Returns
        -------
            str, the path.
        """
        fllpath = os.path.abspath(fileDirectory)
        logger = get_classMethod_logger(self, "setDefaultDirectory")

        if not os.path.exists(fllpath):
            if create:
                logger.debug(f"Directory {fllpath} does not exist. create")
                # ✅ Changed: safer than using os.system("mkdir -p ...")
                os.makedirs(fllpath, exist_ok=True)
            else:
                raise NotADirectoryError(f"{fllpath} is not a directory, and create is False.")

        # ✅ Changed: removed call to self.setConfig(...) since it's not defined in base class
        # ✅ Replaced with direct assignment to self._FilesDirectory as done elsewhere in abstractToolkit
        self._FilesDirectory = fllpath

    def projectPolygonOnPopulation(self, shapelyPolygon, dataSourceOrData, populationTypes="All", dataSourceVersion=None):
        """Deprecated. Use ``analysis.calculatePopulationInPolygon`` instead."""
        import warnings
        warnings.warn("Depracted in Version 2.0.0+. Use datalayer.calculatePopulationInPolygon",
                      category=DeprecationWarning,
                      stacklevel=2)
        self.analysis.calculatePopulationInPolygon(shapelyPolygon=shapelyPolygon, dataSourceOrData=dataSourceOrData, dataSourceVersion=dataSourceVersion, populationTypes=populationTypes)  # 🔧 FIXED: Shape → shapelyPolygon

    @property
    def FilesDirectory(self):
        """str : Path to the files directory."""
        return self._FilesDirectory


    def loadData(self, regionaName, dataOrFileName, version=(0,0,1), metadata=dict(), overwrite=False):
        """
            Loading an demographic data to the DB

            Currently only an existing shp files/geojson files and replaces

            TODO:
                - Should be extended in the future to get a string and save it.
                - Extend to different save modes (i.e just save a file or warn if already exists).

        Parameters
        ----------
        regionaName: str
                The name of the region
        dataOrFileName: str
                Currently only a file name
        version: list
                A list of number to state the version of the data.
        metadata:
                any additional
        :return:
        """
        # ✅ We no longer delete the document manually – handled in addDataSource if needed
        self.addDataSource(
            dataSourceName=regionaName,
            resource=dataOrFileName,
            dataFormat=datatypes.GEOPANDAS,
            overwrite=overwrite,  # ✅ מוסיפים את זה
            **metadata
        )

buildings property

BuildingsToolkit : The buildings toolkit instance.

shapes property

ShapesToolKit : The shapes toolkit instance.

analysis property

analysis : The demography analysis layer.

presentation property

presentation : The demography presentation layer.

populationTypes property

dict : Mapping of population group names to column names.

FilesDirectory property

str : Path to the files directory.

__init__(projectName, filesDirectory=None, connectionName=None)

Initializes the demography tool.

Parameters:

Name Type Description Default
projectName

The project name

required
SourceOrData

None: Try to load the default data source. If does not exist, set data to None. str: The name of the source in the DB that will be used as a data. geoDataFrame: Use this as the demography data. See documentation of the structure of the demographic dataframe.

required
dataSourceVersion tuple of integers

If specified load this version of the data.

required
Source code in hera/measurements/GIS/vector/demography.py
def __init__(self, projectName, filesDirectory=None,connectionName=None):
    """
        Initializes the demography tool.

    Parameters
    ----------
    projectName: str
        The project name

    SourceOrData: str or geopandas or None.
        None:   Try to load the default data source. If does not exist, set data to None.
        str:    The name of the source in the DB that will be used as a data.
        geoDataFrame: Use this as the demography data. See documentation of the structure of the demographic dataframe.

    dataSourceVersion : tuple of integers
        If specified load this version of the data.

    """
    self._projectName = projectName
    super().__init__(projectName=projectName, toolkitName="Demography", filesDirectory=filesDirectory,connectionName=connectionName)
    self._analysis = analysis(self)
    self._presentation = presentation(self)

    self._populationTypes = {"All":"total_pop","Children":"age_0_14","Youth":"age_15_19",
                       "YoungAdults":"age_20_29","Adults":"age_30_64","Elderly":"age_65_up"}

    self._buildings = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_BUILDINGS,projectName=projectName)

setDefaultDirectory(fileDirectory, create=True)

Set the default directory for the project.

Parameters:

Name Type Description Default
fileDirectory
The path to save the regions in.
The directory is created if create flag is true (and directory does not exist).
required
create

If false and directory does not exist, raise a NotADirectoryError exception.

True

Returns:

Type Description
str, the path.
Source code in hera/measurements/GIS/vector/demography.py
def setDefaultDirectory(self, fileDirectory, create=True):
    """
        Set the default directory for the project.

    Parameters
    ----------
    fileDirectory: str
            The path to save the regions in.
            The directory is created if create flag is true (and directory does not exist).

    create: bool
        If false and directory does not exist, raise a NotADirectoryError exception.

    Returns
    -------
        str, the path.
    """
    fllpath = os.path.abspath(fileDirectory)
    logger = get_classMethod_logger(self, "setDefaultDirectory")

    if not os.path.exists(fllpath):
        if create:
            logger.debug(f"Directory {fllpath} does not exist. create")
            # ✅ Changed: safer than using os.system("mkdir -p ...")
            os.makedirs(fllpath, exist_ok=True)
        else:
            raise NotADirectoryError(f"{fllpath} is not a directory, and create is False.")

    # ✅ Changed: removed call to self.setConfig(...) since it's not defined in base class
    # ✅ Replaced with direct assignment to self._FilesDirectory as done elsewhere in abstractToolkit
    self._FilesDirectory = fllpath

projectPolygonOnPopulation(shapelyPolygon, dataSourceOrData, populationTypes='All', dataSourceVersion=None)

Deprecated. Use analysis.calculatePopulationInPolygon instead.

Source code in hera/measurements/GIS/vector/demography.py
def projectPolygonOnPopulation(self, shapelyPolygon, dataSourceOrData, populationTypes="All", dataSourceVersion=None):
    """Deprecated. Use ``analysis.calculatePopulationInPolygon`` instead."""
    import warnings
    warnings.warn("Depracted in Version 2.0.0+. Use datalayer.calculatePopulationInPolygon",
                  category=DeprecationWarning,
                  stacklevel=2)
    self.analysis.calculatePopulationInPolygon(shapelyPolygon=shapelyPolygon, dataSourceOrData=dataSourceOrData, dataSourceVersion=dataSourceVersion, populationTypes=populationTypes)  # 🔧 FIXED: Shape → shapelyPolygon

loadData(regionaName, dataOrFileName, version=(0, 0, 1), metadata=dict(), overwrite=False)

Loading an demographic data to the DB

Currently only an existing shp files/geojson files and replaces

TODO:
    - Should be extended in the future to get a string and save it.
    - Extend to different save modes (i.e just save a file or warn if already exists).

Parameters:

Name Type Description Default
regionaName
The name of the region
required
dataOrFileName
Currently only a file name
required
version
A list of number to state the version of the data.
(0, 0, 1)
metadata
any additional
dict()
Source code in hera/measurements/GIS/vector/demography.py
def loadData(self, regionaName, dataOrFileName, version=(0,0,1), metadata=dict(), overwrite=False):
    """
        Loading an demographic data to the DB

        Currently only an existing shp files/geojson files and replaces

        TODO:
            - Should be extended in the future to get a string and save it.
            - Extend to different save modes (i.e just save a file or warn if already exists).

    Parameters
    ----------
    regionaName: str
            The name of the region
    dataOrFileName: str
            Currently only a file name
    version: list
            A list of number to state the version of the data.
    metadata:
            any additional
    :return:
    """
    # ✅ We no longer delete the document manually – handled in addDataSource if needed
    self.addDataSource(
        dataSourceName=regionaName,
        resource=dataOrFileName,
        dataFormat=datatypes.GEOPANDAS,
        overwrite=overwrite,  # ✅ מוסיפים את זה
        **metadata
    )

DemographyToolkit — Presentation

hera.measurements.GIS.vector.demography.presentation

Presentation layer for the Demography toolkit.

Provides methods for visualizing population density and distribution on geographic maps.

Source code in hera/measurements/GIS/vector/demography.py
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
class presentation:
    """
    Presentation layer for the Demography toolkit.

    Provides methods for visualizing population density and distribution
    on geographic maps.
    """

    _datalayer = None

    @property
    def datalayer(self):
        """DemographyToolkit : Reference to the parent toolkit."""
        return self._datalayer

    def __init__(self, dataLayer):
        """
        Initialize the presentation layer.

        Parameters
        ----------
        dataLayer : DemographyToolkit
            The parent demography toolkit instance.
        """
        self._datalayer = dataLayer

    def plotPopulationDensity(self, data, populationType="total_pop",
                              density_units=None,
                              ax=None, figsize=(12, 10),
                              cmap="YlOrRd", vmin=None, vmax=None,
                              alpha=1.0, edgecolor="0.5", linewidth=0.3,
                              colorbar=True, colorbar_label=None,
                              title=None,
                              xlim=None, ylim=None,
                              inputCRS=None, outputCRS=ITM):
        """
        Plot population density as a choropleth map.

        Computes density as population / area for each polygon
        and plots a colored map.

        Example
        -------
        >>> from hera import toolkitHome
        >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
        >>> census = demo.getDataSourceData("census_2020")
        >>> demo.presentation.plotPopulationDensity(census, density_units=ureg.km**2)

        Parameters
        ----------
        data : geopandas.GeoDataFrame
            The demographic data with geometry and population columns.
        populationType : str
            Column name for the population count (default: ``"total_pop"``).
        ax : matplotlib.axes.Axes, optional
            Axes to plot on. If None, creates a new figure.
        figsize : tuple of float
            Figure size (width, height) in inches. Default: (12, 10).
        cmap : str or matplotlib.colors.Colormap
            Colormap name or instance. Default: ``"YlOrRd"``.
        vmin : float, optional
            Minimum value for the color scale. If None, auto-detected.
        vmax : float, optional
            Maximum value for the color scale. If None, auto-detected.
        alpha : float
            Transparency of the polygons (0=transparent, 1=opaque). Default: 1.0.
        edgecolor : str
            Color of polygon edges. Default: ``"0.5"`` (gray).
        linewidth : float
            Width of polygon edges. Default: 0.3.
        colorbar : bool
            Whether to show a colorbar. Default: True.
        density_units : pint.Unit, optional
            The area unit for density calculation. The CRS determines the
            native area unit (e.g., m² for ITM). This parameter sets the
            denominator unit for the density display. Default: ``ureg.km**2``
            (people/km²). Examples: ``ureg.km**2``, ``ureg.m**2``,
            ``ureg.dunam`` (1000 m²), ``ureg.hectare``.
        colorbar_label : str, optional
            Label for the colorbar. If None, auto-generated from ``density_units``.
        title : str, optional
            Plot title.
        xlim : tuple of float, optional
            (xmin, xmax) limits for the x-axis. If None, auto from data.
        ylim : tuple of float, optional
            (ymin, ymax) limits for the y-axis. If None, auto from data.
        inputCRS : int or str, optional
            CRS of the input data if not already set on the GeoDataFrame.
            Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
            ``hera.measurements.GIS.utils``.
        outputCRS : int or str, optional
            CRS to reproject data before plotting.
            Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
            ``hera.measurements.GIS.utils``.

        Returns
        -------
        matplotlib.axes.Axes
            The axes with the plot.
        """
        if density_units is None:
            density_units = ureg.km ** 2

        plot_data = data.copy()

        if inputCRS is not None:
            plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)

        if outputCRS is not None:
            plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

        # Geometry area is in CRS native units (m² for ITM).
        # Convert to the requested density_units using pint.
        area_m2 = plot_data.geometry.area  # in m² (for projected CRS like ITM)
        conversion_factor = (1.0 * ureg.m ** 2).to(density_units).magnitude
        area_in_units = area_m2 * conversion_factor

        density_col = f"{populationType}_density"
        plot_data[density_col] = plot_data[populationType] / area_in_units
        plot_data[density_col] = plot_data[density_col].replace([np.inf, -np.inf], 0).fillna(0)

        # Build default colorbar label from units
        units_str = f"{density_units:~P}"  # pint short format, e.g. "km²"

        if ax is None:
            fig, ax = plt.subplots(1, 1, figsize=figsize)

        plot_data.plot(
            column=density_col,
            ax=ax,
            cmap=cmap,
            vmin=vmin,
            vmax=vmax,
            alpha=alpha,
            edgecolor=edgecolor,
            linewidth=linewidth,
            legend=False
        )

        if colorbar:
            sm = plt.cm.ScalarMappable(
                cmap=cmap,
                norm=mcolors.Normalize(
                    vmin=vmin if vmin is not None else plot_data[density_col].min(),
                    vmax=vmax if vmax is not None else plot_data[density_col].max()
                )
            )
            sm._A = []
            cbar = plt.colorbar(sm, ax=ax)
            cbar.set_label(colorbar_label or f"Population density [people/{units_str}]")

        if xlim is not None:
            ax.set_xlim(xlim)
        if ylim is not None:
            ax.set_ylim(ylim)
        if title is not None:
            ax.set_title(title)

        return ax

    def plotPopulation(self, data, populationType="total_pop",
                       ax=None, figsize=(12, 10),
                       cmap="YlOrRd", vmin=None, vmax=None,
                       alpha=1.0, edgecolor="0.5", linewidth=0.3,
                       colorbar=True, colorbar_label=None,
                       title=None,
                       xlim=None, ylim=None,
                       inputCRS=None, outputCRS=ITM):
        """
        Plot absolute population count as a choropleth map.

        Example
        -------
        >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
        >>> census = demo.getDataSourceData("census_2020")
        >>> demo.presentation.plotPopulation(census, cmap="Blues")

        Parameters
        ----------
        data : geopandas.GeoDataFrame
            The demographic data with geometry and population columns.
        populationType : str
            Column name for the population count (default: ``"total_pop"``).
        ax : matplotlib.axes.Axes, optional
            Axes to plot on. If None, creates a new figure.
        figsize : tuple of float
            Figure size (width, height) in inches. Default: (12, 10).
        cmap : str or matplotlib.colors.Colormap
            Colormap name or instance. Default: ``"YlOrRd"``.
        vmin : float, optional
            Minimum value for the color scale.
        vmax : float, optional
            Maximum value for the color scale.
        alpha : float
            Transparency (0=transparent, 1=opaque). Default: 1.0.
        edgecolor : str
            Color of polygon edges. Default: ``"0.5"``.
        linewidth : float
            Width of polygon edges. Default: 0.3.
        colorbar : bool
            Whether to show a colorbar. Default: True.
        colorbar_label : str, optional
            Label for the colorbar. If None, uses ``"Population count"``.
        title : str, optional
            Plot title.
        xlim : tuple of float, optional
            (xmin, xmax) limits for the x-axis.
        ylim : tuple of float, optional
            (ymin, ymax) limits for the y-axis.
        inputCRS : int or str, optional
            CRS of the input data if not already set on the GeoDataFrame.
            Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
            ``hera.measurements.GIS.utils``.
        outputCRS : int or str, optional
            CRS to reproject data before plotting.
            Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
            ``hera.measurements.GIS.utils``.

        Returns
        -------
        matplotlib.axes.Axes
            The axes with the plot.
        """
        plot_data = data.copy()

        if inputCRS is not None:
            plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)

        if outputCRS is not None:
            plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

        if ax is None:
            fig, ax = plt.subplots(1, 1, figsize=figsize)

        plot_data.plot(
            column=populationType,
            ax=ax,
            cmap=cmap,
            vmin=vmin,
            vmax=vmax,
            alpha=alpha,
            edgecolor=edgecolor,
            linewidth=linewidth,
            legend=False
        )

        if colorbar:
            sm = plt.cm.ScalarMappable(
                cmap=cmap,
                norm=mcolors.Normalize(
                    vmin=vmin if vmin is not None else plot_data[populationType].min(),
                    vmax=vmax if vmax is not None else plot_data[populationType].max()
                )
            )
            sm._A = []
            cbar = plt.colorbar(sm, ax=ax)
            cbar.set_label(colorbar_label or "Population count")

        if xlim is not None:
            ax.set_xlim(xlim)
        if ylim is not None:
            ax.set_ylim(ylim)
        if title is not None:
            ax.set_title(title)

        return ax

    def plotPopulationByType(self, data, populationTypes=None,
                             figsize=(16, 10), ncols=3,
                             cmap="YlOrRd", alpha=0.8,
                             edgecolor="0.5", linewidth=0.2,
                             inputCRS=None, outputCRS=ITM):
        """
        Plot multiple population types as a grid of subplots.

        Example
        -------
        >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
        >>> census = demo.getDataSourceData("census_2020")
        >>> demo.presentation.plotPopulationByType(census, ncols=3)

        Parameters
        ----------
        data : geopandas.GeoDataFrame
            The demographic data.
        populationTypes : list of str, optional
            Column names to plot. If None, uses all population types
            defined in the toolkit.
        figsize : tuple of float
            Figure size. Default: (16, 10).
        ncols : int
            Number of columns in the subplot grid. Default: 3.
        cmap : str
            Colormap name. Default: ``"YlOrRd"``.
        alpha : float
            Transparency. Default: 0.8.
        edgecolor : str
            Polygon edge color. Default: ``"0.5"``.
        linewidth : float
            Polygon edge width. Default: 0.2.
        inputCRS : int or str, optional
            CRS of the input data if not already set on the GeoDataFrame.
            Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
            ``hera.measurements.GIS.utils``.
        outputCRS : int or str, optional
            CRS to reproject data before plotting.
            Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
            ``hera.measurements.GIS.utils``.

        Returns
        -------
        matplotlib.figure.Figure
            The figure with all subplots.
        """
        if populationTypes is None:
            populationTypes = [v for v in self.datalayer.populationTypes.values()
                               if v in data.columns]

        nplots = len(populationTypes)
        nrows = max(1, (nplots + ncols - 1) // ncols)

        fig, axes = plt.subplots(nrows, ncols, figsize=figsize)
        axes = np.atleast_1d(axes).flatten()

        for i, pop_type in enumerate(populationTypes):
            # Find the display name
            display_name = pop_type
            for name, col in self.datalayer.populationTypes.items():
                if col == pop_type:
                    display_name = name
                    break

            self.plotPopulation(
                data, populationType=pop_type,
                ax=axes[i], cmap=cmap, alpha=alpha,
                edgecolor=edgecolor, linewidth=linewidth,
                title=display_name, colorbar=True,
                colorbar_label=pop_type,
                inputCRS=inputCRS, outputCRS=outputCRS
            )

        # Hide unused axes
        for i in range(nplots, len(axes)):
            axes[i].set_visible(False)

        plt.tight_layout()
        return fig

    def plotPopulationInPolygon(self, intersectionResult, queryPolygon=None,
                                 contextData=None,
                                 populationType="total_pop",
                                 ax=None, figsize=(12, 10),
                                 cmap="OrRd", contextColor="0.9",
                                 alpha=0.8, edgecolor="0.3", linewidth=0.5,
                                 colorbar=True, colorbar_label=None,
                                 title=None,
                                 xlim=None, ylim=None,
                                 inputCRS=None, outputCRS=ITM):
        """
        Plot the result of ``calculatePopulationInPolygon``.

        Shows intersected census areas colored by population, optionally
        with the query polygon outlined and surrounding census areas
        as context.

        Example
        -------
        >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
        >>> census = demo.getDataSourceData("census_2020")
        >>> result = demo.analysis.calculatePopulationInPolygon(my_polygon, census)
        >>> demo.presentation.plotPopulationInPolygon(result, queryPolygon=my_polygon, contextData=census)

        Parameters
        ----------
        intersectionResult : geopandas.GeoDataFrame
            The result from ``analysis.calculatePopulationInPolygon()``.
        queryPolygon : shapely.geometry, optional
            The query polygon to overlay as a dashed outline.
        contextData : geopandas.GeoDataFrame, optional
            Full census data to show as gray background context.
        populationType : str
            Population column to color by. Default: ``"total_pop"``.
        ax : matplotlib.axes.Axes, optional
            Axes to plot on. If None, creates a new figure.
        figsize : tuple of float
            Figure size. Default: (12, 10).
        cmap : str
            Colormap for affected areas. Default: ``"OrRd"``.
        contextColor : str
            Fill color for context (unaffected) areas. Default: ``"0.9"`` (light gray).
        alpha : float
            Transparency of affected areas. Default: 0.8.
        edgecolor : str
            Polygon edge color. Default: ``"0.3"``.
        linewidth : float
            Polygon edge width. Default: 0.5.
        colorbar : bool
            Whether to show a colorbar. Default: True.
        colorbar_label : str, optional
            Colorbar label.
        title : str, optional
            Plot title.
        xlim, ylim : tuple of float, optional
            Axis limits.
        inputCRS : int or str, optional
            CRS of the input data. Use ``WSG84`` (4326) or ``ITM`` (2039).
        outputCRS : int or str
            CRS to reproject to for plotting. Default: ``ITM``.

        Returns
        -------
        matplotlib.axes.Axes
        """
        if ax is None:
            fig, ax = plt.subplots(1, 1, figsize=figsize)

        # Plot context (full census) in gray
        if contextData is not None:
            ctx = contextData.copy()
            if inputCRS is not None:
                ctx = ctx.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
            if outputCRS is not None:
                ctx = ctx.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)
            ctx.plot(ax=ax, color=contextColor, edgecolor="0.7", linewidth=0.2)

        # Plot intersection result
        plot_data = intersectionResult.copy()
        if inputCRS is not None:
            plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
        if outputCRS is not None:
            plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

        if populationType in plot_data.columns:
            plot_data.plot(column=populationType, ax=ax, cmap=cmap, alpha=alpha,
                           edgecolor=edgecolor, linewidth=linewidth, legend=False)
            if colorbar:
                sm = plt.cm.ScalarMappable(
                    cmap=cmap,
                    norm=mcolors.Normalize(
                        vmin=plot_data[populationType].min(),
                        vmax=plot_data[populationType].max()
                    )
                )
                sm._A = []
                cbar = plt.colorbar(sm, ax=ax)
                cbar.set_label(colorbar_label or f"Population ({populationType})")
        else:
            plot_data.plot(ax=ax, color="tomato", alpha=alpha,
                           edgecolor=edgecolor, linewidth=linewidth)

        # Overlay query polygon outline
        if queryPolygon is not None:
            from shapely.geometry import mapping
            import geopandas as _gpd
            poly_gdf = _gpd.GeoDataFrame(geometry=[queryPolygon])
            if inputCRS is not None:
                poly_gdf = poly_gdf.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS)
            if outputCRS is not None:
                poly_gdf = poly_gdf.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)
            poly_gdf.boundary.plot(ax=ax, color="blue", linewidth=2, linestyle="--")

        if xlim is not None:
            ax.set_xlim(xlim)
        if ylim is not None:
            ax.set_ylim(ylim)
        if title is not None:
            ax.set_title(title)

        return ax

    def plotArea(self, areaData, contextData=None,
                 populationType="total_pop",
                 ax=None, figsize=(12, 10),
                 areaColor="steelblue", contextColor="0.9",
                 alpha=0.6, edgecolor="0.3", linewidth=1.0,
                 annotate=True, annotate_fontsize=12,
                 title=None,
                 xlim=None, ylim=None,
                 inputCRS=None, outputCRS=ITM):
        """
        Plot a custom area from ``createNewArea`` with population annotation.

        Example
        -------
        >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
        >>> census = demo.getDataSourceData("census_2020")
        >>> area = demo.analysis.createNewArea(my_polygon, census)
        >>> demo.presentation.plotArea(area.getData(), contextData=census, annotate=True)

        Parameters
        ----------
        areaData : geopandas.GeoDataFrame
            The result from ``analysis.createNewArea()``.
        contextData : geopandas.GeoDataFrame, optional
            Full census data to show as gray background.
        populationType : str
            Population column to annotate. Default: ``"total_pop"``.
        ax : matplotlib.axes.Axes, optional
            Axes to plot on.
        figsize : tuple of float
            Figure size. Default: (12, 10).
        areaColor : str
            Fill color for the custom area. Default: ``"steelblue"``.
        contextColor : str
            Fill color for context areas. Default: ``"0.9"``.
        alpha : float
            Transparency of the custom area. Default: 0.6.
        edgecolor : str
            Edge color. Default: ``"0.3"``.
        linewidth : float
            Edge width. Default: 1.0.
        annotate : bool
            Whether to annotate with population count. Default: True.
        annotate_fontsize : int
            Font size for the annotation. Default: 12.
        title : str, optional
            Plot title.
        xlim, ylim : tuple of float, optional
            Axis limits.
        inputCRS : int or str, optional
            CRS of the input data. Use ``WSG84`` (4326) or ``ITM`` (2039).
        outputCRS : int or str
            CRS to reproject to. Default: ``ITM``.

        Returns
        -------
        matplotlib.axes.Axes
        """
        if ax is None:
            fig, ax = plt.subplots(1, 1, figsize=figsize)

        # Plot context
        if contextData is not None:
            ctx = contextData.copy()
            if inputCRS is not None:
                ctx = ctx.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
            if outputCRS is not None:
                ctx = ctx.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)
            ctx.plot(ax=ax, color=contextColor, edgecolor="0.7", linewidth=0.2)

        # Plot the custom area
        plot_data = areaData.copy()
        if inputCRS is not None:
            plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
        if outputCRS is not None:
            plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

        plot_data.plot(ax=ax, color=areaColor, alpha=alpha,
                       edgecolor=edgecolor, linewidth=linewidth)

        # Annotate with population
        if annotate and populationType in plot_data.columns:
            for _, row in plot_data.iterrows():
                centroid = row.geometry.centroid
                pop_val = row[populationType]
                ax.annotate(
                    f"{pop_val:,.0f}",
                    xy=(centroid.x, centroid.y),
                    ha="center", va="center",
                    fontsize=annotate_fontsize, fontweight="bold",
                    bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8)
                )

        if xlim is not None:
            ax.set_xlim(xlim)
        if ylim is not None:
            ax.set_ylim(ylim)
        if title is not None:
            ax.set_title(title)

        return ax

    def plotPopulationOnMap(self, data, tilesToolkit,
                            populationType="total_pop",
                            density=True,
                            density_units=None,
                            zoomlevel=14,
                            tileServer=None,
                            ax=None, figsize=(14, 12),
                            cmap="YlOrRd",
                            vmin=None, vmax=None,
                            alpha=0.5,
                            edgecolor="0.3", linewidth=0.3,
                            colorbar=True, colorbar_label=None,
                            title=None,
                            inputCRS=None, outputCRS=ITM):
        """
        Overlay population density on a tile server map background.

        Fetches map tiles for the data extent and overlays the population
        choropleth with transparency.

        Example
        -------
        >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
        >>> tiles = toolkitHome.getToolkit(toolkitHome.GIS_TILES, projectName="MY_PROJECT")
        >>> census = demo.getDataSourceData("census_2020")
        >>> demo.presentation.plotPopulationOnMap(census, tiles, density=True, alpha=0.5)

        Parameters
        ----------
        data : geopandas.GeoDataFrame
            The demographic data.
        tilesToolkit : TilesToolkit
            An initialized Tiles toolkit for fetching map images.
            Get via ``toolkitHome.getToolkit(toolkitHome.GIS_TILES, ...)``.
        populationType : str
            Population column to plot. Default: ``"total_pop"``.
        density : bool
            If True, plot population density.
            If False, plot absolute counts. Default: True.
        density_units : pint.Unit, optional
            Area unit for density (e.g., ``ureg.km**2``, ``ureg.dunam``,
            ``ureg.hectare``). Default: ``ureg.km**2``.
        zoomlevel : int
            Tile server zoom level (higher = more detail). Default: 14.
        tileServer : str, optional
            Tile server URL or datasource name. If None, uses default.
        ax : matplotlib.axes.Axes, optional
            Axes to plot on.
        figsize : tuple of float
            Figure size. Default: (14, 12).
        cmap : str
            Colormap for the population overlay. Default: ``"YlOrRd"``.
        vmin, vmax : float, optional
            Color scale range.
        alpha : float
            Transparency of the population overlay (0=transparent, 1=opaque).
            Default: 0.5. Lower values show more of the map underneath.
        edgecolor : str
            Polygon edge color. Default: ``"0.3"``.
        linewidth : float
            Polygon edge width. Default: 0.3.
        colorbar : bool
            Whether to show a colorbar. Default: True.
        colorbar_label : str, optional
            Colorbar label.
        title : str, optional
            Plot title.
        inputCRS : int or str, optional
            CRS of the input data. Use ``WSG84`` (4326) or ``ITM`` (2039).
        outputCRS : int or str
            CRS for the output plot. Default: ``ITM``.

        Returns
        -------
        matplotlib.axes.Axes
        """
        plot_data = data.copy()

        if inputCRS is not None:
            plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)

        # Get bounds in WGS84 for tile fetching
        bounds_wgs84 = plot_data.to_crs(epsg=WSG84).total_bounds  # [minx, miny, maxx, maxy]

        # Fetch map tiles
        tile_img = tilesToolkit.getImageFromCorners(
            minx=bounds_wgs84[0], miny=bounds_wgs84[1],
            maxx=bounds_wgs84[2], maxy=bounds_wgs84[3],
            zoomlevel=zoomlevel,
            tileServer=tileServer,
            inputCRS=WSG84,
            outputCRS=outputCRS if outputCRS is not None else ITM
        )

        # Create figure
        if ax is None:
            fig, ax = plt.subplots(1, 1, figsize=figsize)

        # Plot the map tiles as background
        tilesToolkit.presentation.plot(tile_img, outputCRS=outputCRS if outputCRS is not None else ITM, ax=ax)

        # Reproject population data to output CRS
        if outputCRS is not None:
            plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

        # Compute density if requested
        if density:
            if density_units is None:
                density_units = ureg.km ** 2
            area_m2 = plot_data.geometry.area
            conversion_factor = (1.0 * ureg.m ** 2).to(density_units).magnitude
            area_in_units = area_m2 * conversion_factor
            plot_col = f"{populationType}_density"
            plot_data[plot_col] = (plot_data[populationType] / area_in_units).replace([np.inf, -np.inf], 0).fillna(0)
            units_str = f"{density_units:~P}"
        else:
            plot_col = populationType
            units_str = None

        # Overlay population choropleth
        plot_data.plot(
            column=plot_col, ax=ax,
            cmap=cmap, vmin=vmin, vmax=vmax,
            alpha=alpha,
            edgecolor=edgecolor, linewidth=linewidth,
            legend=False
        )

        if colorbar:
            if colorbar_label:
                label = colorbar_label
            elif density and units_str:
                label = f"Population density [people/{units_str}]"
            else:
                label = "Population count"
            sm = plt.cm.ScalarMappable(
                cmap=cmap,
                norm=mcolors.Normalize(
                    vmin=vmin if vmin is not None else plot_data[plot_col].min(),
                    vmax=vmax if vmax is not None else plot_data[plot_col].max()
                )
            )
            sm._A = []
            cbar = plt.colorbar(sm, ax=ax)
            cbar.set_label(label)

        if title is not None:
            ax.set_title(title)

        return ax

datalayer property

DemographyToolkit : Reference to the parent toolkit.

__init__(dataLayer)

Initialize the presentation layer.

Parameters:

Name Type Description Default
dataLayer DemographyToolkit

The parent demography toolkit instance.

required
Source code in hera/measurements/GIS/vector/demography.py
def __init__(self, dataLayer):
    """
    Initialize the presentation layer.

    Parameters
    ----------
    dataLayer : DemographyToolkit
        The parent demography toolkit instance.
    """
    self._datalayer = dataLayer

plotPopulationDensity(data, populationType='total_pop', density_units=None, ax=None, figsize=(12, 10), cmap='YlOrRd', vmin=None, vmax=None, alpha=1.0, edgecolor='0.5', linewidth=0.3, colorbar=True, colorbar_label=None, title=None, xlim=None, ylim=None, inputCRS=None, outputCRS=ITM)

Plot population density as a choropleth map.

Computes density as population / area for each polygon and plots a colored map.

Example

from hera import toolkitHome demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT") census = demo.getDataSourceData("census_2020") demo.presentation.plotPopulationDensity(census, density_units=ureg.km**2)

Parameters:

Name Type Description Default
data GeoDataFrame

The demographic data with geometry and population columns.

required
populationType str

Column name for the population count (default: "total_pop").

'total_pop'
ax Axes

Axes to plot on. If None, creates a new figure.

None
figsize tuple of float

Figure size (width, height) in inches. Default: (12, 10).

(12, 10)
cmap str or Colormap

Colormap name or instance. Default: "YlOrRd".

'YlOrRd'
vmin float

Minimum value for the color scale. If None, auto-detected.

None
vmax float

Maximum value for the color scale. If None, auto-detected.

None
alpha float

Transparency of the polygons (0=transparent, 1=opaque). Default: 1.0.

1.0
edgecolor str

Color of polygon edges. Default: "0.5" (gray).

'0.5'
linewidth float

Width of polygon edges. Default: 0.3.

0.3
colorbar bool

Whether to show a colorbar. Default: True.

True
density_units Unit

The area unit for density calculation. The CRS determines the native area unit (e.g., m² for ITM). This parameter sets the denominator unit for the density display. Default: ureg.km**2 (people/km²). Examples: ureg.km**2, ureg.m**2, ureg.dunam (1000 m²), ureg.hectare.

None
colorbar_label str

Label for the colorbar. If None, auto-generated from density_units.

None
title str

Plot title.

None
xlim tuple of float

(xmin, xmax) limits for the x-axis. If None, auto from data.

None
ylim tuple of float

(ymin, ymax) limits for the y-axis. If None, auto from data.

None
inputCRS int or str

CRS of the input data if not already set on the GeoDataFrame. Use the constants WSG84 (4326) or ITM (2039) from hera.measurements.GIS.utils.

None
outputCRS int or str

CRS to reproject data before plotting. Use the constants WSG84 (4326) or ITM (2039) from hera.measurements.GIS.utils.

ITM

Returns:

Type Description
Axes

The axes with the plot.

Source code in hera/measurements/GIS/vector/demography.py
def plotPopulationDensity(self, data, populationType="total_pop",
                          density_units=None,
                          ax=None, figsize=(12, 10),
                          cmap="YlOrRd", vmin=None, vmax=None,
                          alpha=1.0, edgecolor="0.5", linewidth=0.3,
                          colorbar=True, colorbar_label=None,
                          title=None,
                          xlim=None, ylim=None,
                          inputCRS=None, outputCRS=ITM):
    """
    Plot population density as a choropleth map.

    Computes density as population / area for each polygon
    and plots a colored map.

    Example
    -------
    >>> from hera import toolkitHome
    >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
    >>> census = demo.getDataSourceData("census_2020")
    >>> demo.presentation.plotPopulationDensity(census, density_units=ureg.km**2)

    Parameters
    ----------
    data : geopandas.GeoDataFrame
        The demographic data with geometry and population columns.
    populationType : str
        Column name for the population count (default: ``"total_pop"``).
    ax : matplotlib.axes.Axes, optional
        Axes to plot on. If None, creates a new figure.
    figsize : tuple of float
        Figure size (width, height) in inches. Default: (12, 10).
    cmap : str or matplotlib.colors.Colormap
        Colormap name or instance. Default: ``"YlOrRd"``.
    vmin : float, optional
        Minimum value for the color scale. If None, auto-detected.
    vmax : float, optional
        Maximum value for the color scale. If None, auto-detected.
    alpha : float
        Transparency of the polygons (0=transparent, 1=opaque). Default: 1.0.
    edgecolor : str
        Color of polygon edges. Default: ``"0.5"`` (gray).
    linewidth : float
        Width of polygon edges. Default: 0.3.
    colorbar : bool
        Whether to show a colorbar. Default: True.
    density_units : pint.Unit, optional
        The area unit for density calculation. The CRS determines the
        native area unit (e.g., m² for ITM). This parameter sets the
        denominator unit for the density display. Default: ``ureg.km**2``
        (people/km²). Examples: ``ureg.km**2``, ``ureg.m**2``,
        ``ureg.dunam`` (1000 m²), ``ureg.hectare``.
    colorbar_label : str, optional
        Label for the colorbar. If None, auto-generated from ``density_units``.
    title : str, optional
        Plot title.
    xlim : tuple of float, optional
        (xmin, xmax) limits for the x-axis. If None, auto from data.
    ylim : tuple of float, optional
        (ymin, ymax) limits for the y-axis. If None, auto from data.
    inputCRS : int or str, optional
        CRS of the input data if not already set on the GeoDataFrame.
        Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
        ``hera.measurements.GIS.utils``.
    outputCRS : int or str, optional
        CRS to reproject data before plotting.
        Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
        ``hera.measurements.GIS.utils``.

    Returns
    -------
    matplotlib.axes.Axes
        The axes with the plot.
    """
    if density_units is None:
        density_units = ureg.km ** 2

    plot_data = data.copy()

    if inputCRS is not None:
        plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)

    if outputCRS is not None:
        plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

    # Geometry area is in CRS native units (m² for ITM).
    # Convert to the requested density_units using pint.
    area_m2 = plot_data.geometry.area  # in m² (for projected CRS like ITM)
    conversion_factor = (1.0 * ureg.m ** 2).to(density_units).magnitude
    area_in_units = area_m2 * conversion_factor

    density_col = f"{populationType}_density"
    plot_data[density_col] = plot_data[populationType] / area_in_units
    plot_data[density_col] = plot_data[density_col].replace([np.inf, -np.inf], 0).fillna(0)

    # Build default colorbar label from units
    units_str = f"{density_units:~P}"  # pint short format, e.g. "km²"

    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)

    plot_data.plot(
        column=density_col,
        ax=ax,
        cmap=cmap,
        vmin=vmin,
        vmax=vmax,
        alpha=alpha,
        edgecolor=edgecolor,
        linewidth=linewidth,
        legend=False
    )

    if colorbar:
        sm = plt.cm.ScalarMappable(
            cmap=cmap,
            norm=mcolors.Normalize(
                vmin=vmin if vmin is not None else plot_data[density_col].min(),
                vmax=vmax if vmax is not None else plot_data[density_col].max()
            )
        )
        sm._A = []
        cbar = plt.colorbar(sm, ax=ax)
        cbar.set_label(colorbar_label or f"Population density [people/{units_str}]")

    if xlim is not None:
        ax.set_xlim(xlim)
    if ylim is not None:
        ax.set_ylim(ylim)
    if title is not None:
        ax.set_title(title)

    return ax

plotPopulation(data, populationType='total_pop', ax=None, figsize=(12, 10), cmap='YlOrRd', vmin=None, vmax=None, alpha=1.0, edgecolor='0.5', linewidth=0.3, colorbar=True, colorbar_label=None, title=None, xlim=None, ylim=None, inputCRS=None, outputCRS=ITM)

Plot absolute population count as a choropleth map.

Example

demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT") census = demo.getDataSourceData("census_2020") demo.presentation.plotPopulation(census, cmap="Blues")

Parameters:

Name Type Description Default
data GeoDataFrame

The demographic data with geometry and population columns.

required
populationType str

Column name for the population count (default: "total_pop").

'total_pop'
ax Axes

Axes to plot on. If None, creates a new figure.

None
figsize tuple of float

Figure size (width, height) in inches. Default: (12, 10).

(12, 10)
cmap str or Colormap

Colormap name or instance. Default: "YlOrRd".

'YlOrRd'
vmin float

Minimum value for the color scale.

None
vmax float

Maximum value for the color scale.

None
alpha float

Transparency (0=transparent, 1=opaque). Default: 1.0.

1.0
edgecolor str

Color of polygon edges. Default: "0.5".

'0.5'
linewidth float

Width of polygon edges. Default: 0.3.

0.3
colorbar bool

Whether to show a colorbar. Default: True.

True
colorbar_label str

Label for the colorbar. If None, uses "Population count".

None
title str

Plot title.

None
xlim tuple of float

(xmin, xmax) limits for the x-axis.

None
ylim tuple of float

(ymin, ymax) limits for the y-axis.

None
inputCRS int or str

CRS of the input data if not already set on the GeoDataFrame. Use the constants WSG84 (4326) or ITM (2039) from hera.measurements.GIS.utils.

None
outputCRS int or str

CRS to reproject data before plotting. Use the constants WSG84 (4326) or ITM (2039) from hera.measurements.GIS.utils.

ITM

Returns:

Type Description
Axes

The axes with the plot.

Source code in hera/measurements/GIS/vector/demography.py
def plotPopulation(self, data, populationType="total_pop",
                   ax=None, figsize=(12, 10),
                   cmap="YlOrRd", vmin=None, vmax=None,
                   alpha=1.0, edgecolor="0.5", linewidth=0.3,
                   colorbar=True, colorbar_label=None,
                   title=None,
                   xlim=None, ylim=None,
                   inputCRS=None, outputCRS=ITM):
    """
    Plot absolute population count as a choropleth map.

    Example
    -------
    >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
    >>> census = demo.getDataSourceData("census_2020")
    >>> demo.presentation.plotPopulation(census, cmap="Blues")

    Parameters
    ----------
    data : geopandas.GeoDataFrame
        The demographic data with geometry and population columns.
    populationType : str
        Column name for the population count (default: ``"total_pop"``).
    ax : matplotlib.axes.Axes, optional
        Axes to plot on. If None, creates a new figure.
    figsize : tuple of float
        Figure size (width, height) in inches. Default: (12, 10).
    cmap : str or matplotlib.colors.Colormap
        Colormap name or instance. Default: ``"YlOrRd"``.
    vmin : float, optional
        Minimum value for the color scale.
    vmax : float, optional
        Maximum value for the color scale.
    alpha : float
        Transparency (0=transparent, 1=opaque). Default: 1.0.
    edgecolor : str
        Color of polygon edges. Default: ``"0.5"``.
    linewidth : float
        Width of polygon edges. Default: 0.3.
    colorbar : bool
        Whether to show a colorbar. Default: True.
    colorbar_label : str, optional
        Label for the colorbar. If None, uses ``"Population count"``.
    title : str, optional
        Plot title.
    xlim : tuple of float, optional
        (xmin, xmax) limits for the x-axis.
    ylim : tuple of float, optional
        (ymin, ymax) limits for the y-axis.
    inputCRS : int or str, optional
        CRS of the input data if not already set on the GeoDataFrame.
        Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
        ``hera.measurements.GIS.utils``.
    outputCRS : int or str, optional
        CRS to reproject data before plotting.
        Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
        ``hera.measurements.GIS.utils``.

    Returns
    -------
    matplotlib.axes.Axes
        The axes with the plot.
    """
    plot_data = data.copy()

    if inputCRS is not None:
        plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)

    if outputCRS is not None:
        plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)

    plot_data.plot(
        column=populationType,
        ax=ax,
        cmap=cmap,
        vmin=vmin,
        vmax=vmax,
        alpha=alpha,
        edgecolor=edgecolor,
        linewidth=linewidth,
        legend=False
    )

    if colorbar:
        sm = plt.cm.ScalarMappable(
            cmap=cmap,
            norm=mcolors.Normalize(
                vmin=vmin if vmin is not None else plot_data[populationType].min(),
                vmax=vmax if vmax is not None else plot_data[populationType].max()
            )
        )
        sm._A = []
        cbar = plt.colorbar(sm, ax=ax)
        cbar.set_label(colorbar_label or "Population count")

    if xlim is not None:
        ax.set_xlim(xlim)
    if ylim is not None:
        ax.set_ylim(ylim)
    if title is not None:
        ax.set_title(title)

    return ax

plotPopulationByType(data, populationTypes=None, figsize=(16, 10), ncols=3, cmap='YlOrRd', alpha=0.8, edgecolor='0.5', linewidth=0.2, inputCRS=None, outputCRS=ITM)

Plot multiple population types as a grid of subplots.

Example

demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT") census = demo.getDataSourceData("census_2020") demo.presentation.plotPopulationByType(census, ncols=3)

Parameters:

Name Type Description Default
data GeoDataFrame

The demographic data.

required
populationTypes list of str

Column names to plot. If None, uses all population types defined in the toolkit.

None
figsize tuple of float

Figure size. Default: (16, 10).

(16, 10)
ncols int

Number of columns in the subplot grid. Default: 3.

3
cmap str

Colormap name. Default: "YlOrRd".

'YlOrRd'
alpha float

Transparency. Default: 0.8.

0.8
edgecolor str

Polygon edge color. Default: "0.5".

'0.5'
linewidth float

Polygon edge width. Default: 0.2.

0.2
inputCRS int or str

CRS of the input data if not already set on the GeoDataFrame. Use the constants WSG84 (4326) or ITM (2039) from hera.measurements.GIS.utils.

None
outputCRS int or str

CRS to reproject data before plotting. Use the constants WSG84 (4326) or ITM (2039) from hera.measurements.GIS.utils.

ITM

Returns:

Type Description
Figure

The figure with all subplots.

Source code in hera/measurements/GIS/vector/demography.py
def plotPopulationByType(self, data, populationTypes=None,
                         figsize=(16, 10), ncols=3,
                         cmap="YlOrRd", alpha=0.8,
                         edgecolor="0.5", linewidth=0.2,
                         inputCRS=None, outputCRS=ITM):
    """
    Plot multiple population types as a grid of subplots.

    Example
    -------
    >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
    >>> census = demo.getDataSourceData("census_2020")
    >>> demo.presentation.plotPopulationByType(census, ncols=3)

    Parameters
    ----------
    data : geopandas.GeoDataFrame
        The demographic data.
    populationTypes : list of str, optional
        Column names to plot. If None, uses all population types
        defined in the toolkit.
    figsize : tuple of float
        Figure size. Default: (16, 10).
    ncols : int
        Number of columns in the subplot grid. Default: 3.
    cmap : str
        Colormap name. Default: ``"YlOrRd"``.
    alpha : float
        Transparency. Default: 0.8.
    edgecolor : str
        Polygon edge color. Default: ``"0.5"``.
    linewidth : float
        Polygon edge width. Default: 0.2.
    inputCRS : int or str, optional
        CRS of the input data if not already set on the GeoDataFrame.
        Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
        ``hera.measurements.GIS.utils``.
    outputCRS : int or str, optional
        CRS to reproject data before plotting.
        Use the constants ``WSG84`` (4326) or ``ITM`` (2039) from
        ``hera.measurements.GIS.utils``.

    Returns
    -------
    matplotlib.figure.Figure
        The figure with all subplots.
    """
    if populationTypes is None:
        populationTypes = [v for v in self.datalayer.populationTypes.values()
                           if v in data.columns]

    nplots = len(populationTypes)
    nrows = max(1, (nplots + ncols - 1) // ncols)

    fig, axes = plt.subplots(nrows, ncols, figsize=figsize)
    axes = np.atleast_1d(axes).flatten()

    for i, pop_type in enumerate(populationTypes):
        # Find the display name
        display_name = pop_type
        for name, col in self.datalayer.populationTypes.items():
            if col == pop_type:
                display_name = name
                break

        self.plotPopulation(
            data, populationType=pop_type,
            ax=axes[i], cmap=cmap, alpha=alpha,
            edgecolor=edgecolor, linewidth=linewidth,
            title=display_name, colorbar=True,
            colorbar_label=pop_type,
            inputCRS=inputCRS, outputCRS=outputCRS
        )

    # Hide unused axes
    for i in range(nplots, len(axes)):
        axes[i].set_visible(False)

    plt.tight_layout()
    return fig

plotPopulationInPolygon(intersectionResult, queryPolygon=None, contextData=None, populationType='total_pop', ax=None, figsize=(12, 10), cmap='OrRd', contextColor='0.9', alpha=0.8, edgecolor='0.3', linewidth=0.5, colorbar=True, colorbar_label=None, title=None, xlim=None, ylim=None, inputCRS=None, outputCRS=ITM)

Plot the result of calculatePopulationInPolygon.

Shows intersected census areas colored by population, optionally with the query polygon outlined and surrounding census areas as context.

Example

demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT") census = demo.getDataSourceData("census_2020") result = demo.analysis.calculatePopulationInPolygon(my_polygon, census) demo.presentation.plotPopulationInPolygon(result, queryPolygon=my_polygon, contextData=census)

Parameters:

Name Type Description Default
intersectionResult GeoDataFrame

The result from analysis.calculatePopulationInPolygon().

required
queryPolygon geometry

The query polygon to overlay as a dashed outline.

None
contextData GeoDataFrame

Full census data to show as gray background context.

None
populationType str

Population column to color by. Default: "total_pop".

'total_pop'
ax Axes

Axes to plot on. If None, creates a new figure.

None
figsize tuple of float

Figure size. Default: (12, 10).

(12, 10)
cmap str

Colormap for affected areas. Default: "OrRd".

'OrRd'
contextColor str

Fill color for context (unaffected) areas. Default: "0.9" (light gray).

'0.9'
alpha float

Transparency of affected areas. Default: 0.8.

0.8
edgecolor str

Polygon edge color. Default: "0.3".

'0.3'
linewidth float

Polygon edge width. Default: 0.5.

0.5
colorbar bool

Whether to show a colorbar. Default: True.

True
colorbar_label str

Colorbar label.

None
title str

Plot title.

None
xlim tuple of float

Axis limits.

None
ylim tuple of float

Axis limits.

None
inputCRS int or str

CRS of the input data. Use WSG84 (4326) or ITM (2039).

None
outputCRS int or str

CRS to reproject to for plotting. Default: ITM.

ITM

Returns:

Type Description
Axes
Source code in hera/measurements/GIS/vector/demography.py
def plotPopulationInPolygon(self, intersectionResult, queryPolygon=None,
                             contextData=None,
                             populationType="total_pop",
                             ax=None, figsize=(12, 10),
                             cmap="OrRd", contextColor="0.9",
                             alpha=0.8, edgecolor="0.3", linewidth=0.5,
                             colorbar=True, colorbar_label=None,
                             title=None,
                             xlim=None, ylim=None,
                             inputCRS=None, outputCRS=ITM):
    """
    Plot the result of ``calculatePopulationInPolygon``.

    Shows intersected census areas colored by population, optionally
    with the query polygon outlined and surrounding census areas
    as context.

    Example
    -------
    >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
    >>> census = demo.getDataSourceData("census_2020")
    >>> result = demo.analysis.calculatePopulationInPolygon(my_polygon, census)
    >>> demo.presentation.plotPopulationInPolygon(result, queryPolygon=my_polygon, contextData=census)

    Parameters
    ----------
    intersectionResult : geopandas.GeoDataFrame
        The result from ``analysis.calculatePopulationInPolygon()``.
    queryPolygon : shapely.geometry, optional
        The query polygon to overlay as a dashed outline.
    contextData : geopandas.GeoDataFrame, optional
        Full census data to show as gray background context.
    populationType : str
        Population column to color by. Default: ``"total_pop"``.
    ax : matplotlib.axes.Axes, optional
        Axes to plot on. If None, creates a new figure.
    figsize : tuple of float
        Figure size. Default: (12, 10).
    cmap : str
        Colormap for affected areas. Default: ``"OrRd"``.
    contextColor : str
        Fill color for context (unaffected) areas. Default: ``"0.9"`` (light gray).
    alpha : float
        Transparency of affected areas. Default: 0.8.
    edgecolor : str
        Polygon edge color. Default: ``"0.3"``.
    linewidth : float
        Polygon edge width. Default: 0.5.
    colorbar : bool
        Whether to show a colorbar. Default: True.
    colorbar_label : str, optional
        Colorbar label.
    title : str, optional
        Plot title.
    xlim, ylim : tuple of float, optional
        Axis limits.
    inputCRS : int or str, optional
        CRS of the input data. Use ``WSG84`` (4326) or ``ITM`` (2039).
    outputCRS : int or str
        CRS to reproject to for plotting. Default: ``ITM``.

    Returns
    -------
    matplotlib.axes.Axes
    """
    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)

    # Plot context (full census) in gray
    if contextData is not None:
        ctx = contextData.copy()
        if inputCRS is not None:
            ctx = ctx.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
        if outputCRS is not None:
            ctx = ctx.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)
        ctx.plot(ax=ax, color=contextColor, edgecolor="0.7", linewidth=0.2)

    # Plot intersection result
    plot_data = intersectionResult.copy()
    if inputCRS is not None:
        plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
    if outputCRS is not None:
        plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

    if populationType in plot_data.columns:
        plot_data.plot(column=populationType, ax=ax, cmap=cmap, alpha=alpha,
                       edgecolor=edgecolor, linewidth=linewidth, legend=False)
        if colorbar:
            sm = plt.cm.ScalarMappable(
                cmap=cmap,
                norm=mcolors.Normalize(
                    vmin=plot_data[populationType].min(),
                    vmax=plot_data[populationType].max()
                )
            )
            sm._A = []
            cbar = plt.colorbar(sm, ax=ax)
            cbar.set_label(colorbar_label or f"Population ({populationType})")
    else:
        plot_data.plot(ax=ax, color="tomato", alpha=alpha,
                       edgecolor=edgecolor, linewidth=linewidth)

    # Overlay query polygon outline
    if queryPolygon is not None:
        from shapely.geometry import mapping
        import geopandas as _gpd
        poly_gdf = _gpd.GeoDataFrame(geometry=[queryPolygon])
        if inputCRS is not None:
            poly_gdf = poly_gdf.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS)
        if outputCRS is not None:
            poly_gdf = poly_gdf.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)
        poly_gdf.boundary.plot(ax=ax, color="blue", linewidth=2, linestyle="--")

    if xlim is not None:
        ax.set_xlim(xlim)
    if ylim is not None:
        ax.set_ylim(ylim)
    if title is not None:
        ax.set_title(title)

    return ax

plotArea(areaData, contextData=None, populationType='total_pop', ax=None, figsize=(12, 10), areaColor='steelblue', contextColor='0.9', alpha=0.6, edgecolor='0.3', linewidth=1.0, annotate=True, annotate_fontsize=12, title=None, xlim=None, ylim=None, inputCRS=None, outputCRS=ITM)

Plot a custom area from createNewArea with population annotation.

Example

demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT") census = demo.getDataSourceData("census_2020") area = demo.analysis.createNewArea(my_polygon, census) demo.presentation.plotArea(area.getData(), contextData=census, annotate=True)

Parameters:

Name Type Description Default
areaData GeoDataFrame

The result from analysis.createNewArea().

required
contextData GeoDataFrame

Full census data to show as gray background.

None
populationType str

Population column to annotate. Default: "total_pop".

'total_pop'
ax Axes

Axes to plot on.

None
figsize tuple of float

Figure size. Default: (12, 10).

(12, 10)
areaColor str

Fill color for the custom area. Default: "steelblue".

'steelblue'
contextColor str

Fill color for context areas. Default: "0.9".

'0.9'
alpha float

Transparency of the custom area. Default: 0.6.

0.6
edgecolor str

Edge color. Default: "0.3".

'0.3'
linewidth float

Edge width. Default: 1.0.

1.0
annotate bool

Whether to annotate with population count. Default: True.

True
annotate_fontsize int

Font size for the annotation. Default: 12.

12
title str

Plot title.

None
xlim tuple of float

Axis limits.

None
ylim tuple of float

Axis limits.

None
inputCRS int or str

CRS of the input data. Use WSG84 (4326) or ITM (2039).

None
outputCRS int or str

CRS to reproject to. Default: ITM.

ITM

Returns:

Type Description
Axes
Source code in hera/measurements/GIS/vector/demography.py
def plotArea(self, areaData, contextData=None,
             populationType="total_pop",
             ax=None, figsize=(12, 10),
             areaColor="steelblue", contextColor="0.9",
             alpha=0.6, edgecolor="0.3", linewidth=1.0,
             annotate=True, annotate_fontsize=12,
             title=None,
             xlim=None, ylim=None,
             inputCRS=None, outputCRS=ITM):
    """
    Plot a custom area from ``createNewArea`` with population annotation.

    Example
    -------
    >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
    >>> census = demo.getDataSourceData("census_2020")
    >>> area = demo.analysis.createNewArea(my_polygon, census)
    >>> demo.presentation.plotArea(area.getData(), contextData=census, annotate=True)

    Parameters
    ----------
    areaData : geopandas.GeoDataFrame
        The result from ``analysis.createNewArea()``.
    contextData : geopandas.GeoDataFrame, optional
        Full census data to show as gray background.
    populationType : str
        Population column to annotate. Default: ``"total_pop"``.
    ax : matplotlib.axes.Axes, optional
        Axes to plot on.
    figsize : tuple of float
        Figure size. Default: (12, 10).
    areaColor : str
        Fill color for the custom area. Default: ``"steelblue"``.
    contextColor : str
        Fill color for context areas. Default: ``"0.9"``.
    alpha : float
        Transparency of the custom area. Default: 0.6.
    edgecolor : str
        Edge color. Default: ``"0.3"``.
    linewidth : float
        Edge width. Default: 1.0.
    annotate : bool
        Whether to annotate with population count. Default: True.
    annotate_fontsize : int
        Font size for the annotation. Default: 12.
    title : str, optional
        Plot title.
    xlim, ylim : tuple of float, optional
        Axis limits.
    inputCRS : int or str, optional
        CRS of the input data. Use ``WSG84`` (4326) or ``ITM`` (2039).
    outputCRS : int or str
        CRS to reproject to. Default: ``ITM``.

    Returns
    -------
    matplotlib.axes.Axes
    """
    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)

    # Plot context
    if contextData is not None:
        ctx = contextData.copy()
        if inputCRS is not None:
            ctx = ctx.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
        if outputCRS is not None:
            ctx = ctx.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)
        ctx.plot(ax=ax, color=contextColor, edgecolor="0.7", linewidth=0.2)

    # Plot the custom area
    plot_data = areaData.copy()
    if inputCRS is not None:
        plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)
    if outputCRS is not None:
        plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

    plot_data.plot(ax=ax, color=areaColor, alpha=alpha,
                   edgecolor=edgecolor, linewidth=linewidth)

    # Annotate with population
    if annotate and populationType in plot_data.columns:
        for _, row in plot_data.iterrows():
            centroid = row.geometry.centroid
            pop_val = row[populationType]
            ax.annotate(
                f"{pop_val:,.0f}",
                xy=(centroid.x, centroid.y),
                ha="center", va="center",
                fontsize=annotate_fontsize, fontweight="bold",
                bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8)
            )

    if xlim is not None:
        ax.set_xlim(xlim)
    if ylim is not None:
        ax.set_ylim(ylim)
    if title is not None:
        ax.set_title(title)

    return ax

plotPopulationOnMap(data, tilesToolkit, populationType='total_pop', density=True, density_units=None, zoomlevel=14, tileServer=None, ax=None, figsize=(14, 12), cmap='YlOrRd', vmin=None, vmax=None, alpha=0.5, edgecolor='0.3', linewidth=0.3, colorbar=True, colorbar_label=None, title=None, inputCRS=None, outputCRS=ITM)

Overlay population density on a tile server map background.

Fetches map tiles for the data extent and overlays the population choropleth with transparency.

Example

demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT") tiles = toolkitHome.getToolkit(toolkitHome.GIS_TILES, projectName="MY_PROJECT") census = demo.getDataSourceData("census_2020") demo.presentation.plotPopulationOnMap(census, tiles, density=True, alpha=0.5)

Parameters:

Name Type Description Default
data GeoDataFrame

The demographic data.

required
tilesToolkit TilesToolkit

An initialized Tiles toolkit for fetching map images. Get via toolkitHome.getToolkit(toolkitHome.GIS_TILES, ...).

required
populationType str

Population column to plot. Default: "total_pop".

'total_pop'
density bool

If True, plot population density. If False, plot absolute counts. Default: True.

True
density_units Unit

Area unit for density (e.g., ureg.km**2, ureg.dunam, ureg.hectare). Default: ureg.km**2.

None
zoomlevel int

Tile server zoom level (higher = more detail). Default: 14.

14
tileServer str

Tile server URL or datasource name. If None, uses default.

None
ax Axes

Axes to plot on.

None
figsize tuple of float

Figure size. Default: (14, 12).

(14, 12)
cmap str

Colormap for the population overlay. Default: "YlOrRd".

'YlOrRd'
vmin float

Color scale range.

None
vmax float

Color scale range.

None
alpha float

Transparency of the population overlay (0=transparent, 1=opaque). Default: 0.5. Lower values show more of the map underneath.

0.5
edgecolor str

Polygon edge color. Default: "0.3".

'0.3'
linewidth float

Polygon edge width. Default: 0.3.

0.3
colorbar bool

Whether to show a colorbar. Default: True.

True
colorbar_label str

Colorbar label.

None
title str

Plot title.

None
inputCRS int or str

CRS of the input data. Use WSG84 (4326) or ITM (2039).

None
outputCRS int or str

CRS for the output plot. Default: ITM.

ITM

Returns:

Type Description
Axes
Source code in hera/measurements/GIS/vector/demography.py
def plotPopulationOnMap(self, data, tilesToolkit,
                        populationType="total_pop",
                        density=True,
                        density_units=None,
                        zoomlevel=14,
                        tileServer=None,
                        ax=None, figsize=(14, 12),
                        cmap="YlOrRd",
                        vmin=None, vmax=None,
                        alpha=0.5,
                        edgecolor="0.3", linewidth=0.3,
                        colorbar=True, colorbar_label=None,
                        title=None,
                        inputCRS=None, outputCRS=ITM):
    """
    Overlay population density on a tile server map background.

    Fetches map tiles for the data extent and overlays the population
    choropleth with transparency.

    Example
    -------
    >>> demo = toolkitHome.getToolkit(toolkitHome.GIS_DEMOGRAPHY, projectName="MY_PROJECT")
    >>> tiles = toolkitHome.getToolkit(toolkitHome.GIS_TILES, projectName="MY_PROJECT")
    >>> census = demo.getDataSourceData("census_2020")
    >>> demo.presentation.plotPopulationOnMap(census, tiles, density=True, alpha=0.5)

    Parameters
    ----------
    data : geopandas.GeoDataFrame
        The demographic data.
    tilesToolkit : TilesToolkit
        An initialized Tiles toolkit for fetching map images.
        Get via ``toolkitHome.getToolkit(toolkitHome.GIS_TILES, ...)``.
    populationType : str
        Population column to plot. Default: ``"total_pop"``.
    density : bool
        If True, plot population density.
        If False, plot absolute counts. Default: True.
    density_units : pint.Unit, optional
        Area unit for density (e.g., ``ureg.km**2``, ``ureg.dunam``,
        ``ureg.hectare``). Default: ``ureg.km**2``.
    zoomlevel : int
        Tile server zoom level (higher = more detail). Default: 14.
    tileServer : str, optional
        Tile server URL or datasource name. If None, uses default.
    ax : matplotlib.axes.Axes, optional
        Axes to plot on.
    figsize : tuple of float
        Figure size. Default: (14, 12).
    cmap : str
        Colormap for the population overlay. Default: ``"YlOrRd"``.
    vmin, vmax : float, optional
        Color scale range.
    alpha : float
        Transparency of the population overlay (0=transparent, 1=opaque).
        Default: 0.5. Lower values show more of the map underneath.
    edgecolor : str
        Polygon edge color. Default: ``"0.3"``.
    linewidth : float
        Polygon edge width. Default: 0.3.
    colorbar : bool
        Whether to show a colorbar. Default: True.
    colorbar_label : str, optional
        Colorbar label.
    title : str, optional
        Plot title.
    inputCRS : int or str, optional
        CRS of the input data. Use ``WSG84`` (4326) or ``ITM`` (2039).
    outputCRS : int or str
        CRS for the output plot. Default: ``ITM``.

    Returns
    -------
    matplotlib.axes.Axes
    """
    plot_data = data.copy()

    if inputCRS is not None:
        plot_data = plot_data.set_crs(epsg=inputCRS if isinstance(inputCRS, int) else inputCRS, allow_override=True)

    # Get bounds in WGS84 for tile fetching
    bounds_wgs84 = plot_data.to_crs(epsg=WSG84).total_bounds  # [minx, miny, maxx, maxy]

    # Fetch map tiles
    tile_img = tilesToolkit.getImageFromCorners(
        minx=bounds_wgs84[0], miny=bounds_wgs84[1],
        maxx=bounds_wgs84[2], maxy=bounds_wgs84[3],
        zoomlevel=zoomlevel,
        tileServer=tileServer,
        inputCRS=WSG84,
        outputCRS=outputCRS if outputCRS is not None else ITM
    )

    # Create figure
    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)

    # Plot the map tiles as background
    tilesToolkit.presentation.plot(tile_img, outputCRS=outputCRS if outputCRS is not None else ITM, ax=ax)

    # Reproject population data to output CRS
    if outputCRS is not None:
        plot_data = plot_data.to_crs(epsg=outputCRS if isinstance(outputCRS, int) else outputCRS)

    # Compute density if requested
    if density:
        if density_units is None:
            density_units = ureg.km ** 2
        area_m2 = plot_data.geometry.area
        conversion_factor = (1.0 * ureg.m ** 2).to(density_units).magnitude
        area_in_units = area_m2 * conversion_factor
        plot_col = f"{populationType}_density"
        plot_data[plot_col] = (plot_data[populationType] / area_in_units).replace([np.inf, -np.inf], 0).fillna(0)
        units_str = f"{density_units:~P}"
    else:
        plot_col = populationType
        units_str = None

    # Overlay population choropleth
    plot_data.plot(
        column=plot_col, ax=ax,
        cmap=cmap, vmin=vmin, vmax=vmax,
        alpha=alpha,
        edgecolor=edgecolor, linewidth=linewidth,
        legend=False
    )

    if colorbar:
        if colorbar_label:
            label = colorbar_label
        elif density and units_str:
            label = f"Population density [people/{units_str}]"
        else:
            label = "Population count"
        sm = plt.cm.ScalarMappable(
            cmap=cmap,
            norm=mcolors.Normalize(
                vmin=vmin if vmin is not None else plot_data[plot_col].min(),
                vmax=vmax if vmax is not None else plot_data[plot_col].max()
            )
        )
        sm._A = []
        cbar = plt.colorbar(sm, ax=ax)
        cbar.set_label(label)

    if title is not None:
        ax.set_title(title)

    return ax

DemographyToolkit — Analysis

hera.measurements.GIS.vector.demography.analysis

Analysis of the demography toolkit.

Source code in hera/measurements/GIS/vector/demography.py
class analysis:
    """
        Analysis of the demography toolkit.
    """

    _datalayer = None

    @property
    def datalayer(self):
        """DemographyToolkit : Reference to the parent data layer."""
        return self._datalayer

    def __init__(self, dataLayer):
        """Initialize the demography analysis with a reference to the data layer.

        Parameters
        ----------
        dataLayer : DemographyToolkit
            The parent demography toolkit instance.
        """
        self._datalayer = dataLayer

    def createNewArea(self,
                      shapeNameOrData,
                      dataSourceOrData,
                      dataSourceVersion=None,
                      populationTypes=None,
                      convex=True,
                      saveMode=TOOLKIT_SAVEMODE_NOSAVE,
                      regionName=None, metadata=dict()):
        """
            Make a geoDataFrame with a polygon as the geometry,
            and the sum of the population in the polygons that intersect it as its population.

            If saveMode is set to save to file (with or without DB) the regionName
            is used as the file name.

        Parameters
        -----------
        shapeNameOrData: str, geopandas
            A shape name, geopandas dataframe, geoJSON str

        dataSourceOrData: str, geopandas
            A demography data source name, a geoJSON (shapes with the population) or geopandas.dataframe

        dataSourceVersion: 3-tuple of int
            A version of the demography data source

        convex: bool

        saveMode: str
                Can be either:

                    - TOOLKIT_SAVEMODE_NOSAVE   : Just load the data from file and return the datafile

                    - TOOLKIT_SAVEMODE_ONLYFILE : Loads the data from file and save to a file.
                                                  raise exception if file exists.

                    - TOOLKIT_SAVEMODE_ONLYFILE_REPLACE: Loads the data from file and save to a file.
                                                  Replace the file if it exists.

                    - TOOLKIT_SAVEMODE_FILEANDDB : Loads the data from file and save to a file and store to the DB as a source.
                                                    Raise exception if the entry exists.

                    - TOOLKIT_SAVEMODE_FILEANDDB_REPLACE: Loads the data from file and save to a file and store to the DB as a source.
                                                    Replace the entry in the DB if it exists.

        regionName: str
            optional. If saved to the DB, use this as a region.
        metadata: dict
            Metadata to be saved to the DB if needed.

        Returns
        -------
            Document with the new data
        """
        # 🛡️ Ensure regionName is provided if saving is requested
        if saveMode in [TOOLKIT_SAVEMODE_ONLYFILE,
                        TOOLKIT_SAVEMODE_ONLYFILE_REPLACE,
                        TOOLKIT_SAVEMODE_FILEANDDB,
                        TOOLKIT_SAVEMODE_FILEANDDB_REPLACE] and regionName is None:
            raise ValueError("Must specify regionName if saveMode is set to save data")

        # 📦 Load population layer
        if isinstance(dataSourceOrData, str):
            Data = self.datalayer.getDataSourceData(dataSourceOrData, dataSourceVersion)
            if Data is None:
                if os.path.exists(dataSourceOrData):
                    Data = geopandas.read_file(dataSourceOrData)  # ← נתיב לקובץ (shp/geojson/gpkg)
                else:
                    Data = geopandas.read_file(io.StringIO(dataSourceOrData))  # ← תוכן טקסטואלי (GeoJSON)
        else:
            Data = dataSourceOrData

        # 📦 Load the polygon
        if isinstance(shapeNameOrData, str):
            polydoc = self.datalayer.shapes.getShape(shapeNameOrData)
            if polydoc is None:
                polydoc = geopandas.read_file(io.StringIO(shapeNameOrData))
        elif isinstance(shapeNameOrData, geopandas.geodataframe.GeoDataFrame):
            polydoc = shapeNameOrData
        else:
            poly = shapeNameOrData  # already a shapely geometry

        # 🧠 Construct final geometry if needed
        if isinstance(shapeNameOrData, str) or isinstance(shapeNameOrData, geopandas.geodataframe.GeoDataFrame):
            if convex:
                polys = self.datalayer.buildings.analysis.ConvexPolygons(polydoc)
                poly = polys.loc[polys.area == polys.area.max()].geometry[0]
            else:
                poly = polydoc.unary_union

        # 📐 Intersect population data with polygon (including topology fix)
        try:
            res_intersect_poly = Data.loc[Data["geometry"].intersection(poly).is_empty == False]
        except Exception as e:
            from shapely.errors import TopologicalError
            if isinstance(e, TopologicalError):
                if not poly.is_valid:
                    poly = poly.buffer(0)
                Data["geometry"] = Data["geometry"].apply(lambda g: g if g.is_valid else g.buffer(0))
                res_intersect_poly = Data.loc[Data["geometry"].intersection(poly).is_empty == False]
            else:
                raise e

        # 🧾 Construct resulting GeoDataFrame
        newData = geopandas.GeoDataFrame.from_dict([{"geometry": poly}])
        newData.crs = Data.crs

        # 🧮 Sum population attributes
        populationTypes = list(
            self.datalayer.populationTypes.values()) if populationTypes is None else populationTypes
        for populationType in populationTypes:
            if populationType in res_intersect_poly:
                newData[populationType] = res_intersect_poly[[populationType]].sum()[populationType]

        doc = None  # will hold DB document if stored

        # 💾 Save to file or DB if required
        if saveMode != TOOLKIT_SAVEMODE_NOSAVE:
            filename = regionName if "." in regionName else f"{regionName}.shp"
            fullname = os.path.join(self.datalayer.filesDirectory, filename)
            newData.to_file(fullname)

            if saveMode == TOOLKIT_SAVEMODE_FILEANDDB:
                desc = {
                    toolkit.TOOLKIT_DATASOURCE_NAME: regionName,
                    toolkit.TOOLKIT_TOOLKITNAME_FIELD: self.datalayer.toolkitName
                }
                desc.update(**metadata)
                doc = self.datalayer.addCacheDocument(
                    desc=desc,
                    resource=fullname,
                    type=toolkit.TOOLKIT_DATASOURCE_TYPE,
                    dataFormat=datatypes.GEOPANDAS
                )

        # 📤 Return result: either wrapped metadata frame or DB document
        return nonDBMetadataFrame(newData) if doc is None else doc

    def calculatePopulationInPolygon(self,
                                     shapelyPolygon,
                                     dataSourceOrData,
                                     dataSourceVersion=None,
                                    populationTypes=None):
        """
            Finds the population in a polygon.

        Parameters:
        -----------

            shapeNameOrData: str, shapely.Polygon, geopandas
                    The polygon to calculate the poulation in.

                    Can be either:
                        - the polygon itself (shapely.Polygon)
                        - shape name in the DB (str)
                        - geoJSON (str)
                        - geopandas.

            dataSourceOrData: str or geopandas
                    The demographic data.

                    Can be either:
                        - demography data source name
                        - geoJSON (str)
                        - geopandas


            dataSourceVersion: 3-tuple of int
                    If dataSourceOrData is demography data source name
                    then dataSourceVersion is a possible version.

            populationTypes: str or list of str
                    Additional population columns that will be calculated.

        Returns
        -------
            geopandas
            The intersection of the demography and the polygon.

        """


        # if isinstance(shapeNameOrData,str):
        #     poly = self.datalayer.shapes.getShape(shapeNameOrData)
        #     if poly is None:
        #         poly = geopandas.read_file(io.StringIO(shapeNameOrData))
        # else:

        poly = shapelyPolygon

        if isinstance(dataSourceOrData, str):
            demography = self.datalayer.getDataSourceData(dataSourceOrData, dataSourceVersion)
            if demography is None:
                if os.path.exists(dataSourceOrData):
                    demography = gpd.read_file(dataSourceOrData)  # ← נתיב לקובץ (shp/gpkg/geojson)
                else:
                    demography = gpd.read_file(io.StringIO(dataSourceOrData))  # ← טקסט (GeoJSON)
        else:
            demography = dataSourceOrData

        populationTypes = self.datalayer.populationTypes if populationTypes is None else populationTypes

        if isinstance(populationTypes,str):
            populationTypes = [populationTypes]

        res_intersect_poly = demography.loc[demography["geometry"].intersection(poly).is_empty == False]
        intersection_poly = res_intersect_poly["geometry"].intersection(poly)

        res_intersection = geopandas.GeoDataFrame.from_dict(
            {"geometry": intersection_poly.geometry,
             "areaFraction": intersection_poly.area / res_intersect_poly.area})

        for populationType in populationTypes:
            populationType = self.datalayer.populationTypes.get(populationType,populationType)
            if populationType in res_intersect_poly:
                res_intersection[populationType] = intersection_poly.area / res_intersect_poly.area * res_intersect_poly[populationType]

        return res_intersection

datalayer property

DemographyToolkit : Reference to the parent data layer.

__init__(dataLayer)

Initialize the demography analysis with a reference to the data layer.

Parameters:

Name Type Description Default
dataLayer DemographyToolkit

The parent demography toolkit instance.

required
Source code in hera/measurements/GIS/vector/demography.py
def __init__(self, dataLayer):
    """Initialize the demography analysis with a reference to the data layer.

    Parameters
    ----------
    dataLayer : DemographyToolkit
        The parent demography toolkit instance.
    """
    self._datalayer = dataLayer

createNewArea(shapeNameOrData, dataSourceOrData, dataSourceVersion=None, populationTypes=None, convex=True, saveMode=TOOLKIT_SAVEMODE_NOSAVE, regionName=None, metadata=dict())

Make a geoDataFrame with a polygon as the geometry,
and the sum of the population in the polygons that intersect it as its population.

If saveMode is set to save to file (with or without DB) the regionName
is used as the file name.

Parameters:

Name Type Description Default
shapeNameOrData

A shape name, geopandas dataframe, geoJSON str

required
dataSourceOrData

A demography data source name, a geoJSON (shapes with the population) or geopandas.dataframe

required
dataSourceVersion

A version of the demography data source

None
convex
True
saveMode
Can be either:

    - TOOLKIT_SAVEMODE_NOSAVE   : Just load the data from file and return the datafile

    - TOOLKIT_SAVEMODE_ONLYFILE : Loads the data from file and save to a file.
                                  raise exception if file exists.

    - TOOLKIT_SAVEMODE_ONLYFILE_REPLACE: Loads the data from file and save to a file.
                                  Replace the file if it exists.

    - TOOLKIT_SAVEMODE_FILEANDDB : Loads the data from file and save to a file and store to the DB as a source.
                                    Raise exception if the entry exists.

    - TOOLKIT_SAVEMODE_FILEANDDB_REPLACE: Loads the data from file and save to a file and store to the DB as a source.
                                    Replace the entry in the DB if it exists.
TOOLKIT_SAVEMODE_NOSAVE
regionName

optional. If saved to the DB, use this as a region.

None
metadata

Metadata to be saved to the DB if needed.

dict()

Returns:

Type Description
Document with the new data
Source code in hera/measurements/GIS/vector/demography.py
def createNewArea(self,
                  shapeNameOrData,
                  dataSourceOrData,
                  dataSourceVersion=None,
                  populationTypes=None,
                  convex=True,
                  saveMode=TOOLKIT_SAVEMODE_NOSAVE,
                  regionName=None, metadata=dict()):
    """
        Make a geoDataFrame with a polygon as the geometry,
        and the sum of the population in the polygons that intersect it as its population.

        If saveMode is set to save to file (with or without DB) the regionName
        is used as the file name.

    Parameters
    -----------
    shapeNameOrData: str, geopandas
        A shape name, geopandas dataframe, geoJSON str

    dataSourceOrData: str, geopandas
        A demography data source name, a geoJSON (shapes with the population) or geopandas.dataframe

    dataSourceVersion: 3-tuple of int
        A version of the demography data source

    convex: bool

    saveMode: str
            Can be either:

                - TOOLKIT_SAVEMODE_NOSAVE   : Just load the data from file and return the datafile

                - TOOLKIT_SAVEMODE_ONLYFILE : Loads the data from file and save to a file.
                                              raise exception if file exists.

                - TOOLKIT_SAVEMODE_ONLYFILE_REPLACE: Loads the data from file and save to a file.
                                              Replace the file if it exists.

                - TOOLKIT_SAVEMODE_FILEANDDB : Loads the data from file and save to a file and store to the DB as a source.
                                                Raise exception if the entry exists.

                - TOOLKIT_SAVEMODE_FILEANDDB_REPLACE: Loads the data from file and save to a file and store to the DB as a source.
                                                Replace the entry in the DB if it exists.

    regionName: str
        optional. If saved to the DB, use this as a region.
    metadata: dict
        Metadata to be saved to the DB if needed.

    Returns
    -------
        Document with the new data
    """
    # 🛡️ Ensure regionName is provided if saving is requested
    if saveMode in [TOOLKIT_SAVEMODE_ONLYFILE,
                    TOOLKIT_SAVEMODE_ONLYFILE_REPLACE,
                    TOOLKIT_SAVEMODE_FILEANDDB,
                    TOOLKIT_SAVEMODE_FILEANDDB_REPLACE] and regionName is None:
        raise ValueError("Must specify regionName if saveMode is set to save data")

    # 📦 Load population layer
    if isinstance(dataSourceOrData, str):
        Data = self.datalayer.getDataSourceData(dataSourceOrData, dataSourceVersion)
        if Data is None:
            if os.path.exists(dataSourceOrData):
                Data = geopandas.read_file(dataSourceOrData)  # ← נתיב לקובץ (shp/geojson/gpkg)
            else:
                Data = geopandas.read_file(io.StringIO(dataSourceOrData))  # ← תוכן טקסטואלי (GeoJSON)
    else:
        Data = dataSourceOrData

    # 📦 Load the polygon
    if isinstance(shapeNameOrData, str):
        polydoc = self.datalayer.shapes.getShape(shapeNameOrData)
        if polydoc is None:
            polydoc = geopandas.read_file(io.StringIO(shapeNameOrData))
    elif isinstance(shapeNameOrData, geopandas.geodataframe.GeoDataFrame):
        polydoc = shapeNameOrData
    else:
        poly = shapeNameOrData  # already a shapely geometry

    # 🧠 Construct final geometry if needed
    if isinstance(shapeNameOrData, str) or isinstance(shapeNameOrData, geopandas.geodataframe.GeoDataFrame):
        if convex:
            polys = self.datalayer.buildings.analysis.ConvexPolygons(polydoc)
            poly = polys.loc[polys.area == polys.area.max()].geometry[0]
        else:
            poly = polydoc.unary_union

    # 📐 Intersect population data with polygon (including topology fix)
    try:
        res_intersect_poly = Data.loc[Data["geometry"].intersection(poly).is_empty == False]
    except Exception as e:
        from shapely.errors import TopologicalError
        if isinstance(e, TopologicalError):
            if not poly.is_valid:
                poly = poly.buffer(0)
            Data["geometry"] = Data["geometry"].apply(lambda g: g if g.is_valid else g.buffer(0))
            res_intersect_poly = Data.loc[Data["geometry"].intersection(poly).is_empty == False]
        else:
            raise e

    # 🧾 Construct resulting GeoDataFrame
    newData = geopandas.GeoDataFrame.from_dict([{"geometry": poly}])
    newData.crs = Data.crs

    # 🧮 Sum population attributes
    populationTypes = list(
        self.datalayer.populationTypes.values()) if populationTypes is None else populationTypes
    for populationType in populationTypes:
        if populationType in res_intersect_poly:
            newData[populationType] = res_intersect_poly[[populationType]].sum()[populationType]

    doc = None  # will hold DB document if stored

    # 💾 Save to file or DB if required
    if saveMode != TOOLKIT_SAVEMODE_NOSAVE:
        filename = regionName if "." in regionName else f"{regionName}.shp"
        fullname = os.path.join(self.datalayer.filesDirectory, filename)
        newData.to_file(fullname)

        if saveMode == TOOLKIT_SAVEMODE_FILEANDDB:
            desc = {
                toolkit.TOOLKIT_DATASOURCE_NAME: regionName,
                toolkit.TOOLKIT_TOOLKITNAME_FIELD: self.datalayer.toolkitName
            }
            desc.update(**metadata)
            doc = self.datalayer.addCacheDocument(
                desc=desc,
                resource=fullname,
                type=toolkit.TOOLKIT_DATASOURCE_TYPE,
                dataFormat=datatypes.GEOPANDAS
            )

    # 📤 Return result: either wrapped metadata frame or DB document
    return nonDBMetadataFrame(newData) if doc is None else doc

calculatePopulationInPolygon(shapelyPolygon, dataSourceOrData, dataSourceVersion=None, populationTypes=None)

Finds the population in a polygon.
Parameters:
shapeNameOrData: str, shapely.Polygon, geopandas
        The polygon to calculate the poulation in.

        Can be either:
            - the polygon itself (shapely.Polygon)
            - shape name in the DB (str)
            - geoJSON (str)
            - geopandas.

dataSourceOrData: str or geopandas
        The demographic data.

        Can be either:
            - demography data source name
            - geoJSON (str)
            - geopandas


dataSourceVersion: 3-tuple of int
        If dataSourceOrData is demography data source name
        then dataSourceVersion is a possible version.

populationTypes: str or list of str
        Additional population columns that will be calculated.

Returns:

Type Description
geopandas

The intersection of the demography and the polygon.

Source code in hera/measurements/GIS/vector/demography.py
def calculatePopulationInPolygon(self,
                                 shapelyPolygon,
                                 dataSourceOrData,
                                 dataSourceVersion=None,
                                populationTypes=None):
    """
        Finds the population in a polygon.

    Parameters:
    -----------

        shapeNameOrData: str, shapely.Polygon, geopandas
                The polygon to calculate the poulation in.

                Can be either:
                    - the polygon itself (shapely.Polygon)
                    - shape name in the DB (str)
                    - geoJSON (str)
                    - geopandas.

        dataSourceOrData: str or geopandas
                The demographic data.

                Can be either:
                    - demography data source name
                    - geoJSON (str)
                    - geopandas


        dataSourceVersion: 3-tuple of int
                If dataSourceOrData is demography data source name
                then dataSourceVersion is a possible version.

        populationTypes: str or list of str
                Additional population columns that will be calculated.

    Returns
    -------
        geopandas
        The intersection of the demography and the polygon.

    """


    # if isinstance(shapeNameOrData,str):
    #     poly = self.datalayer.shapes.getShape(shapeNameOrData)
    #     if poly is None:
    #         poly = geopandas.read_file(io.StringIO(shapeNameOrData))
    # else:

    poly = shapelyPolygon

    if isinstance(dataSourceOrData, str):
        demography = self.datalayer.getDataSourceData(dataSourceOrData, dataSourceVersion)
        if demography is None:
            if os.path.exists(dataSourceOrData):
                demography = gpd.read_file(dataSourceOrData)  # ← נתיב לקובץ (shp/gpkg/geojson)
            else:
                demography = gpd.read_file(io.StringIO(dataSourceOrData))  # ← טקסט (GeoJSON)
    else:
        demography = dataSourceOrData

    populationTypes = self.datalayer.populationTypes if populationTypes is None else populationTypes

    if isinstance(populationTypes,str):
        populationTypes = [populationTypes]

    res_intersect_poly = demography.loc[demography["geometry"].intersection(poly).is_empty == False]
    intersection_poly = res_intersect_poly["geometry"].intersection(poly)

    res_intersection = geopandas.GeoDataFrame.from_dict(
        {"geometry": intersection_poly.geometry,
         "areaFraction": intersection_poly.area / res_intersect_poly.area})

    for populationType in populationTypes:
        populationType = self.datalayer.populationTypes.get(populationType,populationType)
        if populationType in res_intersect_poly:
            res_intersection[populationType] = intersection_poly.area / res_intersect_poly.area * res_intersect_poly[populationType]

    return res_intersection

GIS - Raster

TopographyToolkit (Raster)

hera.measurements.GIS.raster.topography.TopographyToolkit

Bases: abstractToolkit

Toolkit for raster-based topography and elevation operations.

Source code in hera/measurements/GIS/raster/topography.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
class TopographyToolkit(toolkit.abstractToolkit):
    """Toolkit for raster-based topography and elevation operations."""

    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        """
        Initializes the TopographyToolkit.

        Parameters
        ----------
        projectName : str
            The project Name that the toolkit is initialized on.
        filesDirectory : str or None
            The path to save region files when they are created.

            - If str, it represents a path (relative or absolute) to save the files. The directory is created automatically.
            - If None, tries to get the default path of the project from the config. If it does not
              exist, then uses the current working directory.
        """

        # Important change:
        # Instead of passing a full Project object to the parent class (Toolkit),
        # we now pass only the project name (as a string).
        # This is necessary because MongoDB expects simple types like strings,
        # and cannot serialize full complex Python objects (like Project instances).
        super().__init__(projectName=projectName, toolkitName='TopographyToolkit', filesDirectory=filesDirectory,connectionName=connectionName)

        # Initialize the analysis module for topography calculations
        self._analysis = topographyAnalysis(self)
    def findElevationFile(self, filename, dataSourceName):
        """
        Attempts to find the .hgt file in one of the registered resource folders.
        Supports both single path or list of paths.
        """
        resources = self.getDataSourceData(dataSourceName)
        if isinstance(resources, str):
            resources = [resources]

        for folder in resources:
            candidate = os.path.join(folder, filename)
            if os.path.exists(candidate):
                return candidate

        raise FileNotFoundError(f"{filename} not found in any of: {resources}")


    def getPointElevation(self,lat, long,dataSourceName=None):
        """
            get the elevation above sea level in meters from DEM for a point
            SRTM30 means 30 meters resolution

        Parameters
        ----------
        lat : float
            Latitiute in the WSG projection
        long : float
            Longitude in the WSG projection

        dataSourceName: string
            The name of the datasources loaded. Use getDataSourceList to see which datasource were loaded.

        Returns
        -------
        eleveation above sea level

        How to use
        ----------
        from hera import toolkitHome
        tk = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_RASTER)
        tk.getPointElevation(lat = 33.459,long = 35.755)

        This should return ~  ~820 according to amud anan.
        """
        logger = get_classMethod_logger(self,"getPointElevation")

        filename = 'N'+str(int(lat))+'E'+str(int(long)).zfill(3)+'.hgt'
        logger.info(f"Getting elevation from file {filename}")

        if dataSourceName is None:
            dataSourceName = self.getConfig()['defaultSRTM']

        if dataSourceName is None:
            raise ValueError(f"The datasource is not found!")

        logger.debug(f"Getting the data source {dataSourceName}")
        fheight = self.findElevationFile(filename, dataSourceName)

        ds =  gdal.Open(fheight)
        myarray = numpy.array(ds.GetRasterBand(1).ReadAsArray())
        myarray[myarray < -1000] = 0  # deal with problematic points
        gt = ds.GetGeoTransform()
        rastery = (long - gt[0]) / gt[1]
        rasterx = (lat - gt[3]) / gt[5]
        height11 = myarray[int(rasterx), int(rastery)]
        if (int(rasterx)+1>=myarray.shape[0]) or (int(rastery)+1>=myarray.shape[1]):
            height12 = height11
            height21 = height11
            height22 = height11
        else:
            height12 = myarray[int(rasterx) + 1, int(rastery)]
            height21 = myarray[int(rasterx), int(rastery) + 1]
            height22 = myarray[int(rasterx) + 1, int(rastery) + 1]
        height1 = (1. - (rasterx - int(rasterx))) * height11 + (rasterx - int(rasterx)) * height12
        height2 = (1. - (rasterx - int(rasterx))) * height21 + (rasterx - int(rasterx)) * height22
        elevation = (1. - (rastery - int(rastery))) * height1 + (rastery - int(rastery)) * height2

        return elevation

    def getPointListElevation(self, pointList, dataSourceName=None, inputCRS=WSG84):
        """
            Return the elevation of the point list.

            For now we assume that pointList is in WSG84 coordinates.

        Parameters
        ----------
        latList
        longList
        dataSourceName

        Returns
        -------

        """
        totalNumberOfPoints = pointList.shape[0]
        logger = get_classMethod_logger(self, "getPointListElevation")
        logger.info(f"Computing the elevation for a list of {totalNumberOfPoints} points")
        logger.debug("Computing the file name for each point")


        if isinstance(pointList,geopandas.geoseries.GeoSeries):
            pointList = pointList.to_frame("point").assign(filename=pointList.apply(lambda row: 'N' + str(int(row.y)) + 'E' + str(int(row.x)).zfill(3) + '.hgt' ),
                                                           lon=pointList.x,
                                                           lat=pointList.y,
                                                           elevation=0)

        elif isinstance(pointList,geopandas.geodataframe.GeoDataFrame):
            if 'point' not in pointList:
                error = "GeoDataFrame must contain a field 'point' that contain the points of interest"
                logger.error(error)
                raise ValueError(error)
            pointList = pointList.assign(filename=pointList.apply(lambda row: 'N' + str(int(row.point.y)) + 'E' + str(int(row.point.x)).zfill(3) + '.hgt', axis=1),
                                         lon=pointList.point.x,
                                         lat=pointList.point.y,
                                         elevation=0)
        else:
            pointList = pointList.assign(filename=pointList.apply(lambda x: 'N' + str(int(x.lat)) + 'E' + str(int(x.lon)).zfill(3) + '.hgt' ,axis=1),elevation=0)


        if dataSourceName is None:
            dataSourceName = self.getConfig()['defaultSRTM']
        logger.info(f"Getting the elevation for the points. Using datasource {dataSourceName}")

        if dataSourceName is None:
            err = "The datasource is not found!"
            logger.error(err)
            raise ValueError(err)

        processed = 0

        for grpid,grp in pointList.groupby("filename"):
            logger.info(f"\tProcessing the group {grpid} with {grp.shape[0]}. Processed so far {processed}/{totalNumberOfPoints}")
            fheight = self.findElevationFile(grpid, dataSourceName)
            logger.debug(f"Getting data from the file {fheight}")
            ds = gdal.Open(fheight)
            myarray = numpy.array(ds.GetRasterBand(1).ReadAsArray())
            myarray[myarray < -1000] = 0  # deal with problematic points
            gt = ds.GetGeoTransform()
            for itemindx,item in grp.iterrows():
                rastery = (item.lon - gt[0]) / gt[1]
                rasterx = (item.lat - gt[3]) / gt[5]
                height11 = myarray[int(rasterx), int(rastery)]
                if (int(rasterx)+1>=myarray.shape[0]) or (int(rastery)+1>=myarray.shape[1]):
                    height12 = height11
                    height21 = height11
                    height22 = height11
                else:
                    height12 = myarray[int(rasterx) + 1, int(rastery)]
                    height21 = myarray[int(rasterx), int(rastery) + 1]
                    height22 = myarray[int(rasterx) + 1, int(rastery) + 1]
                height1 = (1. - (rasterx - int(rasterx))) * height11 + (rasterx - int(rasterx)) * height12
                height2 = (1. - (rasterx - int(rasterx))) * height21 + (rasterx - int(rasterx)) * height22
                elevation = (1. - (rastery - int(rastery))) * height1 + (rastery - int(rastery)) * height2
                pointList.loc[itemindx,'elevation'] = elevation
                processed += grp.shape[0]
        return pointList



    def convertPointsCRS(self, points, inputCRS, outputCRS, **kwargs):
        """
        Convert list/array/DataFrame of points from one CRS to another, using GeoPandas.

        Parameters
        ----------
        points : list of tuples, numpy array, or pandas.DataFrame
            Points to convert.

        inputCRS : int
            EPSG code of input coordinate system.

        outputCRS : int
            EPSG code of output coordinate system.

        Returns
        -------
        geopandas.GeoDataFrame
            Converted points with 'geometry' column.
        """

        if isinstance(points, np.ndarray):
            if points.ndim == 1:
                gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([points[0]], [points[1]]))
            else:
                gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(points[:, 0], points[:, 1]))

        elif isinstance(points, pd.DataFrame):
            x_col = kwargs.get("x", "x")
            y_col = kwargs.get("y", "y")
            gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(points[x_col], points[y_col]))

        elif isinstance(points, list):
            if len(points) == 1:
                gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([points[0][0]], [points[0][1]]))
            else:
                gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([x[0] for x in points], [x[1] for x in points]))

        else:
            raise ValueError(f"Unsupported type: {type(points)}")

        gdf.set_crs(inputCRS, inplace=True)
        return gdf.to_crs(outputCRS)

    def create_xarray(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84):
        """
        Create an xarray grid of lat/lon points within the given bounding box.

        Parameters
        ----------
        minx, miny, maxx, maxy : float
            Bounding box coordinates.
        dxdy : float
            Grid spacing (in meters for ITM or degrees for WGS84).
        inputCRS : int
            EPSG code of the coordinate system (default is WGS84).

        Returns
        -------
        xarray.Dataset
            Dataset containing lat/lon coordinates over the i, j grid.

        Notes
        -----
        ✅ Added memory guard: throws an error if grid is larger than 1,000,000 points.
        """

        if inputCRS == WSG84:
            # Convert lat/lon to ITM using internal conversion method
            min_pp = self.convertPointsCRS(points=[[miny, minx]], inputCRS=inputCRS, outputCRS=ITM).geometry.iloc[0]
            max_pp = self.convertPointsCRS(points=[[maxy, maxx]], inputCRS=inputCRS, outputCRS=ITM).geometry.iloc[0]
        else:
            min_pp = Point(minx, miny)
            max_pp = Point(maxx, maxy)

        x = np.arange(min_pp.x, max_pp.x, dxdy)
        y = np.arange(min_pp.y, max_pp.y, dxdy)

        # 🔒 Memory guard
        if len(x) * len(y) > 1_000_000:
            raise MemoryError(f"Too many grid points: {len(x)} x {len(y)} = {len(x) * len(y)}. "
                              f"Increase dxdy or reduce area.")

        xx, yy = np.meshgrid(x, y[::-1])
        grid_points = pd.DataFrame({'x': xx.ravel(), 'y': yy.ravel()})

        gdf = gpd.GeoDataFrame(
            grid_points,
            geometry=gpd.points_from_xy(grid_points['x'], grid_points['y']),
            crs=ITM
        )

        # Convert back to WGS84
        gdf_transformed = gdf.to_crs(WSG84)
        gdf_transformed['lat'] = gdf_transformed.geometry.y
        gdf_transformed['lon'] = gdf_transformed.geometry.x

        lat_grid = gdf_transformed['lat'].values.reshape(xx.shape)
        lon_grid = gdf_transformed['lon'].values.reshape(xx.shape)

        i = np.arange(xx.shape[0])
        j = np.arange(xx.shape[1])

        return xr.Dataset(
            coords={
                'i': i,
                'j': j,
            },
            data_vars={
                'lat': (['i', 'j'], lat_grid),
                'lon': (['i', 'j'], lon_grid)
            }
        )

    def getElevation(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None):
        """
        Generates elevation data over a rectangular area using given resolution.

        Parameters
        ----------
        minx, miny, maxx, maxy : float
            Bounding box coordinates of the area to analyze.
        dxdy : float
            Resolution (spacing) in coordinate units (default is 30).
        inputCRS : CRS
            Coordinate reference system of the input coordinates.
        dataSourceName : str, optional
            Name of the data source to fetch elevations from.

        Returns
        -------
        xarray.Dataset
            Dataset with 'lat', 'lon' and calculated 'elevation' layer.

        Notes
        -----
        🔧 FIXED: Replaced xarray_dataset.shape access with .shape from 'lat' variable,
        since xarray.Dataset has no shape attribute.
        """

        # Create initial lat/lon grid
        xarray_dataset = self.create_xarray(minx, miny, maxx, maxy, dxdy, inputCRS)

        # Flatten to point list
        pointList = pd.DataFrame({
            'lat': xarray_dataset['lat'].values.flatten(),
            'lon': xarray_dataset['lon'].values.flatten()
        })

        # Get elevations
        elevation_df = self.getPointListElevation(pointList, dataSourceName)

        # 🔧 FIX: Use shape of lat array instead of dataset shape
        i_dim, j_dim = xarray_dataset['lat'].shape

        # Add elevation coordinate to dataset
        xarray_dataset = xarray_dataset.assign_coords(
            elevation=(('i', 'j'), elevation_df['elevation'].values.reshape(i_dim, j_dim))
        )

        return xarray_dataset

    def getElevationOfXarray(self, xarray_dataset, dataSourceName=None):
        """
        Computes elevation values for each (lat, lon) point in an xarray dataset
        and returns the same dataset with an added 'elevation' coordinate.

        Parameters
        ----------
        xarray_dataset : xarray.Dataset
            Dataset with 'lat' and 'lon' variables defined over dimensions ['i', 'j']
        dataSourceName : str, optional
            Name of the data source to use. If not given, will be extracted from config.

        Returns
        -------
        xarray.Dataset
            Same dataset with added 'elevation' coordinate over ['i', 'j']

        Notes
        -----
        🔧 FIXED: previously tried to access `xarray_dataset.shape` which doesn't exist on Dataset.
        Now correctly gets shape from 'lat' variable.
        """

        # Convert the xarray dataset into a flat DataFrame of points
        pointList = pd.DataFrame({
            'lat': xarray_dataset['lat'].values.flatten(),
            'lon': xarray_dataset['lon'].values.flatten()
        })

        # Get elevation values for the points
        elevation_df = self.getPointListElevation(pointList, dataSourceName)

        # 🔧 FIX: Replace xarray_dataset.shape with actual shape from 'lat' variable
        i_dim, j_dim = xarray_dataset['lat'].shape

        # Assign elevation as a new coordinate in the dataset
        xarray_dataset = xarray_dataset.assign_coords(
            elevation=(['i', 'j'], elevation_df['elevation'].values.reshape(i_dim, j_dim))
        )

        return xarray_dataset

    def createElevationSTL(self, minx, miny, maxx, maxy, dxdy = 30,shiftx=0,shifty=0,inputCRS=WSG84, dataSourceName=None, solidName="Topography"):
        """
            Return the STL string from xarray dataset with the following fields:
        Parameters
        ----------
        lowerleft_point : float
                The lower left corner
        upperright_point: float
                The upper right corner

        dxdy : float
                The resolution in m (default m).
        inputCRS : The ESPG code of the input projection.

        outputCRS : The ESPG code of the output projection.
                    [Default ITM]

        shiftx : Used when one wants to set another point as origin center

        shifty : Used when one wants to set another point as origin center

        Returns
        -------

        """
        elevation = self.getElevation(minx=minx,miny=miny, maxx=maxx, maxy=maxy, dxdy=dxdy, inputCRS=inputCRS, dataSourceName=dataSourceName)

        return self.getElevationSTL(elevation,shiftx,shifty,solidName)


    def getElevationSTL(self,elevation,shiftx=0,shifty=0,solidName="Topography"):
        """
        Generates STL string from elevation dataset.

        Parameters
        ----------
        elevation : xarray.Dataset
            Dataset with 'lat', 'lon' and 'elevation' coordinates.
        shiftx, shifty : float
            Optional shifts in X and Y.
        solidName : str
            Name of the STL solid.

        Returns
        -------
        str
            STL content as string.

        Notes
        -----
        🔧 FIXED: Accessed shape using elevation['elevation'].shape instead of elevation.shape
        because xarray.Dataset has no attribute 'shape'.
        """
        grid_points = pd.DataFrame({
            'x': elevation['lat'].values.flatten(),
            'y': elevation['lon'].values.flatten()
        })
        gdf = gpd.GeoDataFrame(
            grid_points,
            geometry=gpd.points_from_xy(grid_points['y'], grid_points['x']),
            crs=WSG84
        )
        gdf_transformed = gdf.to_crs(ITM)
        gdf_transformed['y'] = gdf_transformed.geometry.y
        gdf_transformed['x'] = gdf_transformed.geometry.x

        # 🔧 FIX: Use actual shape from elevation['elevation'] instead of elevation.shape
        i_dim, j_dim = elevation['elevation'].shape
        xx = gdf_transformed['x'].values.reshape(i_dim, j_dim)
        yy = gdf_transformed['y'].values.reshape(i_dim, j_dim)
        stlstr = stlFactory().rasterToSTL(xx - shiftx, yy - shifty, elevation['elevation'].values, solidName=solidName)
        return stlstr

__init__(projectName, filesDirectory=None, connectionName=None)

Initializes the TopographyToolkit.

Parameters:

Name Type Description Default
projectName str

The project Name that the toolkit is initialized on.

required
filesDirectory str or None

The path to save region files when they are created.

  • If str, it represents a path (relative or absolute) to save the files. The directory is created automatically.
  • If None, tries to get the default path of the project from the config. If it does not exist, then uses the current working directory.
None
Source code in hera/measurements/GIS/raster/topography.py
def __init__(self, projectName, filesDirectory=None,connectionName=None):
    """
    Initializes the TopographyToolkit.

    Parameters
    ----------
    projectName : str
        The project Name that the toolkit is initialized on.
    filesDirectory : str or None
        The path to save region files when they are created.

        - If str, it represents a path (relative or absolute) to save the files. The directory is created automatically.
        - If None, tries to get the default path of the project from the config. If it does not
          exist, then uses the current working directory.
    """

    # Important change:
    # Instead of passing a full Project object to the parent class (Toolkit),
    # we now pass only the project name (as a string).
    # This is necessary because MongoDB expects simple types like strings,
    # and cannot serialize full complex Python objects (like Project instances).
    super().__init__(projectName=projectName, toolkitName='TopographyToolkit', filesDirectory=filesDirectory,connectionName=connectionName)

    # Initialize the analysis module for topography calculations
    self._analysis = topographyAnalysis(self)

findElevationFile(filename, dataSourceName)

Attempts to find the .hgt file in one of the registered resource folders. Supports both single path or list of paths.

Source code in hera/measurements/GIS/raster/topography.py
def findElevationFile(self, filename, dataSourceName):
    """
    Attempts to find the .hgt file in one of the registered resource folders.
    Supports both single path or list of paths.
    """
    resources = self.getDataSourceData(dataSourceName)
    if isinstance(resources, str):
        resources = [resources]

    for folder in resources:
        candidate = os.path.join(folder, filename)
        if os.path.exists(candidate):
            return candidate

    raise FileNotFoundError(f"{filename} not found in any of: {resources}")

getPointElevation(lat, long, dataSourceName=None)

get the elevation above sea level in meters from DEM for a point
SRTM30 means 30 meters resolution

Parameters:

Name Type Description Default
lat float

Latitiute in the WSG projection

required
long float

Longitude in the WSG projection

required
dataSourceName

The name of the datasources loaded. Use getDataSourceList to see which datasource were loaded.

None

Returns:

Type Description
eleveation above sea level
How to use

from hera import toolkitHome tk = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_RASTER) tk.getPointElevation(lat = 33.459,long = 35.755)

This should return ~ ~820 according to amud anan.

Source code in hera/measurements/GIS/raster/topography.py
def getPointElevation(self,lat, long,dataSourceName=None):
    """
        get the elevation above sea level in meters from DEM for a point
        SRTM30 means 30 meters resolution

    Parameters
    ----------
    lat : float
        Latitiute in the WSG projection
    long : float
        Longitude in the WSG projection

    dataSourceName: string
        The name of the datasources loaded. Use getDataSourceList to see which datasource were loaded.

    Returns
    -------
    eleveation above sea level

    How to use
    ----------
    from hera import toolkitHome
    tk = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_RASTER)
    tk.getPointElevation(lat = 33.459,long = 35.755)

    This should return ~  ~820 according to amud anan.
    """
    logger = get_classMethod_logger(self,"getPointElevation")

    filename = 'N'+str(int(lat))+'E'+str(int(long)).zfill(3)+'.hgt'
    logger.info(f"Getting elevation from file {filename}")

    if dataSourceName is None:
        dataSourceName = self.getConfig()['defaultSRTM']

    if dataSourceName is None:
        raise ValueError(f"The datasource is not found!")

    logger.debug(f"Getting the data source {dataSourceName}")
    fheight = self.findElevationFile(filename, dataSourceName)

    ds =  gdal.Open(fheight)
    myarray = numpy.array(ds.GetRasterBand(1).ReadAsArray())
    myarray[myarray < -1000] = 0  # deal with problematic points
    gt = ds.GetGeoTransform()
    rastery = (long - gt[0]) / gt[1]
    rasterx = (lat - gt[3]) / gt[5]
    height11 = myarray[int(rasterx), int(rastery)]
    if (int(rasterx)+1>=myarray.shape[0]) or (int(rastery)+1>=myarray.shape[1]):
        height12 = height11
        height21 = height11
        height22 = height11
    else:
        height12 = myarray[int(rasterx) + 1, int(rastery)]
        height21 = myarray[int(rasterx), int(rastery) + 1]
        height22 = myarray[int(rasterx) + 1, int(rastery) + 1]
    height1 = (1. - (rasterx - int(rasterx))) * height11 + (rasterx - int(rasterx)) * height12
    height2 = (1. - (rasterx - int(rasterx))) * height21 + (rasterx - int(rasterx)) * height22
    elevation = (1. - (rastery - int(rastery))) * height1 + (rastery - int(rastery)) * height2

    return elevation

getPointListElevation(pointList, dataSourceName=None, inputCRS=WSG84)

Return the elevation of the point list.

For now we assume that pointList is in WSG84 coordinates.

Parameters:

Name Type Description Default
latList
required
longList
required
dataSourceName
None
Source code in hera/measurements/GIS/raster/topography.py
def getPointListElevation(self, pointList, dataSourceName=None, inputCRS=WSG84):
    """
        Return the elevation of the point list.

        For now we assume that pointList is in WSG84 coordinates.

    Parameters
    ----------
    latList
    longList
    dataSourceName

    Returns
    -------

    """
    totalNumberOfPoints = pointList.shape[0]
    logger = get_classMethod_logger(self, "getPointListElevation")
    logger.info(f"Computing the elevation for a list of {totalNumberOfPoints} points")
    logger.debug("Computing the file name for each point")


    if isinstance(pointList,geopandas.geoseries.GeoSeries):
        pointList = pointList.to_frame("point").assign(filename=pointList.apply(lambda row: 'N' + str(int(row.y)) + 'E' + str(int(row.x)).zfill(3) + '.hgt' ),
                                                       lon=pointList.x,
                                                       lat=pointList.y,
                                                       elevation=0)

    elif isinstance(pointList,geopandas.geodataframe.GeoDataFrame):
        if 'point' not in pointList:
            error = "GeoDataFrame must contain a field 'point' that contain the points of interest"
            logger.error(error)
            raise ValueError(error)
        pointList = pointList.assign(filename=pointList.apply(lambda row: 'N' + str(int(row.point.y)) + 'E' + str(int(row.point.x)).zfill(3) + '.hgt', axis=1),
                                     lon=pointList.point.x,
                                     lat=pointList.point.y,
                                     elevation=0)
    else:
        pointList = pointList.assign(filename=pointList.apply(lambda x: 'N' + str(int(x.lat)) + 'E' + str(int(x.lon)).zfill(3) + '.hgt' ,axis=1),elevation=0)


    if dataSourceName is None:
        dataSourceName = self.getConfig()['defaultSRTM']
    logger.info(f"Getting the elevation for the points. Using datasource {dataSourceName}")

    if dataSourceName is None:
        err = "The datasource is not found!"
        logger.error(err)
        raise ValueError(err)

    processed = 0

    for grpid,grp in pointList.groupby("filename"):
        logger.info(f"\tProcessing the group {grpid} with {grp.shape[0]}. Processed so far {processed}/{totalNumberOfPoints}")
        fheight = self.findElevationFile(grpid, dataSourceName)
        logger.debug(f"Getting data from the file {fheight}")
        ds = gdal.Open(fheight)
        myarray = numpy.array(ds.GetRasterBand(1).ReadAsArray())
        myarray[myarray < -1000] = 0  # deal with problematic points
        gt = ds.GetGeoTransform()
        for itemindx,item in grp.iterrows():
            rastery = (item.lon - gt[0]) / gt[1]
            rasterx = (item.lat - gt[3]) / gt[5]
            height11 = myarray[int(rasterx), int(rastery)]
            if (int(rasterx)+1>=myarray.shape[0]) or (int(rastery)+1>=myarray.shape[1]):
                height12 = height11
                height21 = height11
                height22 = height11
            else:
                height12 = myarray[int(rasterx) + 1, int(rastery)]
                height21 = myarray[int(rasterx), int(rastery) + 1]
                height22 = myarray[int(rasterx) + 1, int(rastery) + 1]
            height1 = (1. - (rasterx - int(rasterx))) * height11 + (rasterx - int(rasterx)) * height12
            height2 = (1. - (rasterx - int(rasterx))) * height21 + (rasterx - int(rasterx)) * height22
            elevation = (1. - (rastery - int(rastery))) * height1 + (rastery - int(rastery)) * height2
            pointList.loc[itemindx,'elevation'] = elevation
            processed += grp.shape[0]
    return pointList

convertPointsCRS(points, inputCRS, outputCRS, **kwargs)

Convert list/array/DataFrame of points from one CRS to another, using GeoPandas.

Parameters:

Name Type Description Default
points list of tuples, numpy array, or pandas.DataFrame

Points to convert.

required
inputCRS int

EPSG code of input coordinate system.

required
outputCRS int

EPSG code of output coordinate system.

required

Returns:

Type Description
GeoDataFrame

Converted points with 'geometry' column.

Source code in hera/measurements/GIS/raster/topography.py
def convertPointsCRS(self, points, inputCRS, outputCRS, **kwargs):
    """
    Convert list/array/DataFrame of points from one CRS to another, using GeoPandas.

    Parameters
    ----------
    points : list of tuples, numpy array, or pandas.DataFrame
        Points to convert.

    inputCRS : int
        EPSG code of input coordinate system.

    outputCRS : int
        EPSG code of output coordinate system.

    Returns
    -------
    geopandas.GeoDataFrame
        Converted points with 'geometry' column.
    """

    if isinstance(points, np.ndarray):
        if points.ndim == 1:
            gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([points[0]], [points[1]]))
        else:
            gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(points[:, 0], points[:, 1]))

    elif isinstance(points, pd.DataFrame):
        x_col = kwargs.get("x", "x")
        y_col = kwargs.get("y", "y")
        gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(points[x_col], points[y_col]))

    elif isinstance(points, list):
        if len(points) == 1:
            gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([points[0][0]], [points[0][1]]))
        else:
            gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([x[0] for x in points], [x[1] for x in points]))

    else:
        raise ValueError(f"Unsupported type: {type(points)}")

    gdf.set_crs(inputCRS, inplace=True)
    return gdf.to_crs(outputCRS)

create_xarray(minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84)

Create an xarray grid of lat/lon points within the given bounding box.

Parameters:

Name Type Description Default
minx float

Bounding box coordinates.

required
miny float

Bounding box coordinates.

required
maxx float

Bounding box coordinates.

required
maxy float

Bounding box coordinates.

required
dxdy float

Grid spacing (in meters for ITM or degrees for WGS84).

30
inputCRS int

EPSG code of the coordinate system (default is WGS84).

WSG84

Returns:

Type Description
Dataset

Dataset containing lat/lon coordinates over the i, j grid.

Notes

✅ Added memory guard: throws an error if grid is larger than 1,000,000 points.

Source code in hera/measurements/GIS/raster/topography.py
def create_xarray(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84):
    """
    Create an xarray grid of lat/lon points within the given bounding box.

    Parameters
    ----------
    minx, miny, maxx, maxy : float
        Bounding box coordinates.
    dxdy : float
        Grid spacing (in meters for ITM or degrees for WGS84).
    inputCRS : int
        EPSG code of the coordinate system (default is WGS84).

    Returns
    -------
    xarray.Dataset
        Dataset containing lat/lon coordinates over the i, j grid.

    Notes
    -----
    ✅ Added memory guard: throws an error if grid is larger than 1,000,000 points.
    """

    if inputCRS == WSG84:
        # Convert lat/lon to ITM using internal conversion method
        min_pp = self.convertPointsCRS(points=[[miny, minx]], inputCRS=inputCRS, outputCRS=ITM).geometry.iloc[0]
        max_pp = self.convertPointsCRS(points=[[maxy, maxx]], inputCRS=inputCRS, outputCRS=ITM).geometry.iloc[0]
    else:
        min_pp = Point(minx, miny)
        max_pp = Point(maxx, maxy)

    x = np.arange(min_pp.x, max_pp.x, dxdy)
    y = np.arange(min_pp.y, max_pp.y, dxdy)

    # 🔒 Memory guard
    if len(x) * len(y) > 1_000_000:
        raise MemoryError(f"Too many grid points: {len(x)} x {len(y)} = {len(x) * len(y)}. "
                          f"Increase dxdy or reduce area.")

    xx, yy = np.meshgrid(x, y[::-1])
    grid_points = pd.DataFrame({'x': xx.ravel(), 'y': yy.ravel()})

    gdf = gpd.GeoDataFrame(
        grid_points,
        geometry=gpd.points_from_xy(grid_points['x'], grid_points['y']),
        crs=ITM
    )

    # Convert back to WGS84
    gdf_transformed = gdf.to_crs(WSG84)
    gdf_transformed['lat'] = gdf_transformed.geometry.y
    gdf_transformed['lon'] = gdf_transformed.geometry.x

    lat_grid = gdf_transformed['lat'].values.reshape(xx.shape)
    lon_grid = gdf_transformed['lon'].values.reshape(xx.shape)

    i = np.arange(xx.shape[0])
    j = np.arange(xx.shape[1])

    return xr.Dataset(
        coords={
            'i': i,
            'j': j,
        },
        data_vars={
            'lat': (['i', 'j'], lat_grid),
            'lon': (['i', 'j'], lon_grid)
        }
    )

getElevation(minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None)

Generates elevation data over a rectangular area using given resolution.

Parameters:

Name Type Description Default
minx float

Bounding box coordinates of the area to analyze.

required
miny float

Bounding box coordinates of the area to analyze.

required
maxx float

Bounding box coordinates of the area to analyze.

required
maxy float

Bounding box coordinates of the area to analyze.

required
dxdy float

Resolution (spacing) in coordinate units (default is 30).

30
inputCRS CRS

Coordinate reference system of the input coordinates.

WSG84
dataSourceName str

Name of the data source to fetch elevations from.

None

Returns:

Type Description
Dataset

Dataset with 'lat', 'lon' and calculated 'elevation' layer.

Notes

🔧 FIXED: Replaced xarray_dataset.shape access with .shape from 'lat' variable, since xarray.Dataset has no shape attribute.

Source code in hera/measurements/GIS/raster/topography.py
def getElevation(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None):
    """
    Generates elevation data over a rectangular area using given resolution.

    Parameters
    ----------
    minx, miny, maxx, maxy : float
        Bounding box coordinates of the area to analyze.
    dxdy : float
        Resolution (spacing) in coordinate units (default is 30).
    inputCRS : CRS
        Coordinate reference system of the input coordinates.
    dataSourceName : str, optional
        Name of the data source to fetch elevations from.

    Returns
    -------
    xarray.Dataset
        Dataset with 'lat', 'lon' and calculated 'elevation' layer.

    Notes
    -----
    🔧 FIXED: Replaced xarray_dataset.shape access with .shape from 'lat' variable,
    since xarray.Dataset has no shape attribute.
    """

    # Create initial lat/lon grid
    xarray_dataset = self.create_xarray(minx, miny, maxx, maxy, dxdy, inputCRS)

    # Flatten to point list
    pointList = pd.DataFrame({
        'lat': xarray_dataset['lat'].values.flatten(),
        'lon': xarray_dataset['lon'].values.flatten()
    })

    # Get elevations
    elevation_df = self.getPointListElevation(pointList, dataSourceName)

    # 🔧 FIX: Use shape of lat array instead of dataset shape
    i_dim, j_dim = xarray_dataset['lat'].shape

    # Add elevation coordinate to dataset
    xarray_dataset = xarray_dataset.assign_coords(
        elevation=(('i', 'j'), elevation_df['elevation'].values.reshape(i_dim, j_dim))
    )

    return xarray_dataset

getElevationOfXarray(xarray_dataset, dataSourceName=None)

Computes elevation values for each (lat, lon) point in an xarray dataset and returns the same dataset with an added 'elevation' coordinate.

Parameters:

Name Type Description Default
xarray_dataset Dataset

Dataset with 'lat' and 'lon' variables defined over dimensions ['i', 'j']

required
dataSourceName str

Name of the data source to use. If not given, will be extracted from config.

None

Returns:

Type Description
Dataset

Same dataset with added 'elevation' coordinate over ['i', 'j']

Notes

🔧 FIXED: previously tried to access xarray_dataset.shape which doesn't exist on Dataset. Now correctly gets shape from 'lat' variable.

Source code in hera/measurements/GIS/raster/topography.py
def getElevationOfXarray(self, xarray_dataset, dataSourceName=None):
    """
    Computes elevation values for each (lat, lon) point in an xarray dataset
    and returns the same dataset with an added 'elevation' coordinate.

    Parameters
    ----------
    xarray_dataset : xarray.Dataset
        Dataset with 'lat' and 'lon' variables defined over dimensions ['i', 'j']
    dataSourceName : str, optional
        Name of the data source to use. If not given, will be extracted from config.

    Returns
    -------
    xarray.Dataset
        Same dataset with added 'elevation' coordinate over ['i', 'j']

    Notes
    -----
    🔧 FIXED: previously tried to access `xarray_dataset.shape` which doesn't exist on Dataset.
    Now correctly gets shape from 'lat' variable.
    """

    # Convert the xarray dataset into a flat DataFrame of points
    pointList = pd.DataFrame({
        'lat': xarray_dataset['lat'].values.flatten(),
        'lon': xarray_dataset['lon'].values.flatten()
    })

    # Get elevation values for the points
    elevation_df = self.getPointListElevation(pointList, dataSourceName)

    # 🔧 FIX: Replace xarray_dataset.shape with actual shape from 'lat' variable
    i_dim, j_dim = xarray_dataset['lat'].shape

    # Assign elevation as a new coordinate in the dataset
    xarray_dataset = xarray_dataset.assign_coords(
        elevation=(['i', 'j'], elevation_df['elevation'].values.reshape(i_dim, j_dim))
    )

    return xarray_dataset

createElevationSTL(minx, miny, maxx, maxy, dxdy=30, shiftx=0, shifty=0, inputCRS=WSG84, dataSourceName=None, solidName='Topography')

Return the STL string from xarray dataset with the following fields:

Parameters:

Name Type Description Default
lowerleft_point float
The lower left corner
required
upperright_point
The upper right corner
required
dxdy float
The resolution in m (default m).
30
inputCRS The ESPG code of the input projection.
WSG84
outputCRS The ESPG code of the output projection.
    [Default ITM]
required
shiftx Used when one wants to set another point as origin center
0
shifty Used when one wants to set another point as origin center
0
Source code in hera/measurements/GIS/raster/topography.py
def createElevationSTL(self, minx, miny, maxx, maxy, dxdy = 30,shiftx=0,shifty=0,inputCRS=WSG84, dataSourceName=None, solidName="Topography"):
    """
        Return the STL string from xarray dataset with the following fields:
    Parameters
    ----------
    lowerleft_point : float
            The lower left corner
    upperright_point: float
            The upper right corner

    dxdy : float
            The resolution in m (default m).
    inputCRS : The ESPG code of the input projection.

    outputCRS : The ESPG code of the output projection.
                [Default ITM]

    shiftx : Used when one wants to set another point as origin center

    shifty : Used when one wants to set another point as origin center

    Returns
    -------

    """
    elevation = self.getElevation(minx=minx,miny=miny, maxx=maxx, maxy=maxy, dxdy=dxdy, inputCRS=inputCRS, dataSourceName=dataSourceName)

    return self.getElevationSTL(elevation,shiftx,shifty,solidName)

getElevationSTL(elevation, shiftx=0, shifty=0, solidName='Topography')

Generates STL string from elevation dataset.

Parameters:

Name Type Description Default
elevation Dataset

Dataset with 'lat', 'lon' and 'elevation' coordinates.

required
shiftx float

Optional shifts in X and Y.

0
shifty float

Optional shifts in X and Y.

0
solidName str

Name of the STL solid.

'Topography'

Returns:

Type Description
str

STL content as string.

Notes

🔧 FIXED: Accessed shape using elevation['elevation'].shape instead of elevation.shape because xarray.Dataset has no attribute 'shape'.

Source code in hera/measurements/GIS/raster/topography.py
def getElevationSTL(self,elevation,shiftx=0,shifty=0,solidName="Topography"):
    """
    Generates STL string from elevation dataset.

    Parameters
    ----------
    elevation : xarray.Dataset
        Dataset with 'lat', 'lon' and 'elevation' coordinates.
    shiftx, shifty : float
        Optional shifts in X and Y.
    solidName : str
        Name of the STL solid.

    Returns
    -------
    str
        STL content as string.

    Notes
    -----
    🔧 FIXED: Accessed shape using elevation['elevation'].shape instead of elevation.shape
    because xarray.Dataset has no attribute 'shape'.
    """
    grid_points = pd.DataFrame({
        'x': elevation['lat'].values.flatten(),
        'y': elevation['lon'].values.flatten()
    })
    gdf = gpd.GeoDataFrame(
        grid_points,
        geometry=gpd.points_from_xy(grid_points['y'], grid_points['x']),
        crs=WSG84
    )
    gdf_transformed = gdf.to_crs(ITM)
    gdf_transformed['y'] = gdf_transformed.geometry.y
    gdf_transformed['x'] = gdf_transformed.geometry.x

    # 🔧 FIX: Use actual shape from elevation['elevation'] instead of elevation.shape
    i_dim, j_dim = elevation['elevation'].shape
    xx = gdf_transformed['x'].values.reshape(i_dim, j_dim)
    yy = gdf_transformed['y'].values.reshape(i_dim, j_dim)
    stlstr = stlFactory().rasterToSTL(xx - shiftx, yy - shifty, elevation['elevation'].values, solidName=solidName)
    return stlstr

LandCoverToolkit

hera.measurements.GIS.raster.landcover.LandCoverToolkit

Bases: abstractToolkit

Toolkit for land cover data retrieval, roughness calculation, and mapping.

Source code in hera/measurements/GIS/raster/landcover.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
class LandCoverToolkit(toolkit.abstractToolkit):
    """Toolkit for land cover data retrieval, roughness calculation, and mapping."""
    # """
    # We should use different roughness to buildings and to topo
    #
    #  MCD12Q1 - MODIS/Terra+Aqua Land Cover Type Yearly L3 Global 500m SIN Grid
    #
    #   data is from https://zenodo.org/records/8367523
    #   type are in https://ladsweb.modaps.eosdis.nasa.gov/filespec/MODIS/6/MCD12Q1
    #
    #   Land Cover Type 1: IGBP global vegetation classification scheme
    #   Land Cover Type 2: University of Maryland (UMD) scheme
    #
    #   DataField    Name                    Data      Dimensions
    #                                        Type
    #
    #   DataField_1  Land_Cover_Type_1       UINT8     Dimension_1
    #                                                  Dimension_2
    #
    #           Description:    land cover types (IGBP)
    #           DataField_1 HDF Attributes:
    #              long_name      STRING  1   PGE    "Land_Cover_Type_1"
    #              units          STRING  1   PGE     "class number"
    #              valid_range    uint8    2   PGE     0 16
    #              _FillValue     uint8    1   PGE     255
    #              Water          uint8    1   PGE     0
    #              Evergreen needleleaf forest
    #                             uint8    1   PGE     1
    #              Evergreen broadleaf forest
    #                             uint8    1   PGE     2
    #              Deciduous needleleaf forest
    #                             uint8    1   PGE     3
    #              Deciduous broadleaf forest
    #                             uint8    1   PGE     4
    #              Mixed forests  unit8    1   PGE     5
    #              Closed shrubland
    #                             unit8    1   PGE     6
    #              Open shrublands
    #                             unit8    1   PGE     7
    #              Woody savannas unit8    1   PGE     8
    #              Savannas       unit8    1   PGE     9
    #              Grasslands     unit8    1   PGE    10
    #              Permanent wetlands
    #                             unit8    1   PGE    11
    #              Croplands      unit8    1   PGE    12
    #              Urban and built-up
    #                             unit8    1   PGE    13
    #              Cropland/natural vegetation mosaic
    #                             unit8    1   PGE    14
    #              Snow and ice   unit8    1   PGE    15
    #              Barren or sparsely vegetated
    #                             unit8    1   PGE    16
    #
    #   DataField_2  Land_Cover_Type_2       UINT8     Dimension_1
    #                                                  Dimension_2
    #
    #           Description:    land cover types (UMD)
    #           DataField_2 HDF Attributes:
    #              long_name      STRING   1   PGE    "Land_Cover_Type_2"
    #              units          STRING   1   PGE     "class number"
    #              valid_range    uint8    2   PGE     0 16
    #              _FillValue     uint8    1   PGE     255
    #              Water          uint8    1   PGE     0
    #              Evergreen needleleaf forest
    #                             uint8    1   PGE     1
    #              Evergreen broadleaf forest
    #                             uint8    1   PGE     2
    #              Deciduous needleleaf forest
    #                             uint8    1   PGE     3
    #              Deciduous broadleaf forest
    #                             uint8    1   PGE     4
    #              Mixed forests  uint8    1   PGE     5
    #              Closed shrublands
    #                              unit8   1   PGE     6
    #              Open shrubland  unit8   1   PGE     7
    #              Woody savannas  unit8    1   PGE     8
    #              Savannas       unit8    1   PGE     9
    #              Grasslands     uint8    1   PGE    10
    #              Croplands      uint8    1   PGE    12
    #              Urban and built-up
    #                             uint8    1   PGE    13
    #              Barren or sparsely vegetated
    #                             unit8    1   PGE    16
    #
    #   Land cover may change between winter and summer
    #   urban area may change over the years
    # """

    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        """
        Initializes land cover data toolkit.

        Parameters
        ----------
        projectName: str
            The project Name that the toolkit is initialized on
        toolkitName: str
            the specific toolkit, getting from the child.

        FilesDirectory: str or None
            The path to save a regions files when they are created.
            if str then represents a path (relative or absolute) to save the files in. The directory is created automatically.
            if None, then tries to get the default path of the project from the config. if it does not
            exist, then use the current directory.
        """
        super().__init__(projectName=projectName, toolkitName='LandCoverToolkit', filesDirectory=filesDirectory,connectionName=connectionName)
        self._presentation = presentation(dataLayer=self)



    def getLandCoverAtPoint(self, lon, lat, inputCRS=WSG84, dataSourceName=None):
        """
        Get the landcover type integer value in a specific point.

        Parameters
        ----------
        lon : float
            The longitude coordinate.

        lat : float
            The latitude coordinate.

        inputCRS : int, default=WSG84
            The EPSG of the input coordinates.

        dataSourceName : string, default=None
            The name or path of the data source to use.

        Returns
        -------
        int
            Land cover integer value at the point.
        """

        # קבע שם ברירת מחדל אם לא ניתן
        dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

        # נסה להשיג את ה-DataSource דרך הפונקציה הקיימת
        ds = self.getDataSourceData(dataSourceName)

        # If the datasource stores a file path (dataFormat="string"), open it with rasterio
        if isinstance(ds, str) and os.path.isfile(ds):
            ds = rasterio.open(ds)
        elif ds is None:
            if isinstance(dataSourceName, str) and os.path.isfile(dataSourceName):
                ds = rasterio.open(dataSourceName)
            else:
                raise ValueError(f"Could not load data source: {dataSourceName}")

        # המר קואורדינטות אם צריך
        if inputCRS != ds.crs.to_epsg():
            transformer = Transformer.from_crs(inputCRS, ds.crs.to_epsg(), always_xy=True)
            lon, lat = transformer.transform(lon, lat)

        # קריאה של התמונה והמידע המרחבי
        img = ds.read(1)
        gt = ds.transform
        width = ds.width
        height = ds.height

        # חישוב אינדקסים של פיקסל
        x = math.floor((lat - gt.f) / gt.e)
        y = math.floor((lon - gt.c) / gt.a)

        # בדיקה שהפיקסל בטווח
        if x < 0 or x >= img.shape[0] or y < 0 or y >= img.shape[1]:
            raise IndexError(f"Point ({lon},{lat}) is out of raster bounds.")

        return int(img[x, y])


    def getLandCover(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None):
        """
        Get Xarray LandCover map.

        Parameters
        ----------
        minx : float
            Minimum value of X axis. If using WSG84 - the minimum Latitude coordinates.

        miny : float
            Minimum value of Y axis. If using WSG84 - the minimum Longitude coordinates.

        maxx: float
            Maximum value of X axis. If using WSG84 - the maximum Latitude coordinates.

        maxy: float
            Maximum value of Y axis. If using WSG84 - the maximum Longitude coordinates.

        dxdy: int, default=30
            Spatial resolution of the output land cover map.

        inputCRS : int, default=WSG84
            The EPSG of the coordinates.

        dataSourceName : string, default=None
            The name of the data source to use.

        Returns
        -------
        xarray.DataArray
        """

        def vectorizeLandCoverCalc(lat, lon, img, lonUpperLeft, lonResolution, latUpperLeft, latResolution):
            """Return the landcover pixel value at a given lat/lon coordinate."""
            ilat = math.floor((lat - latUpperLeft) / latResolution)
            ilon = math.floor((lon - lonUpperLeft) / lonResolution)
            return img[ilat, ilon]

        xarray_dataset = create_xarray(minx, miny, maxx, maxy, dxdy, inputCRS)
        dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

        # [ADD] Handle raw file path directly if given
        if isinstance(dataSourceName, str) and os.path.isfile(dataSourceName):
            with rasterio.open(dataSourceName) as src:
                img = src.read(1)
                lonUpperLeft, latUpperLeft = src.transform[2], src.transform[5]
                lonResolution, latResolution = src.transform[0], src.transform[4]
        else:
            ds = self.getDataSourceData(dataSourceName)
            if isinstance(ds, str) and os.path.isfile(ds):
                # dataFormat="string" returns the file path; open with rasterio
                with rasterio.open(ds) as src:
                    img = src.read(1)
                    lonUpperLeft, latUpperLeft = src.transform[2], src.transform[5]
                    lonResolution, latResolution = src.transform[0], src.transform[4]
            elif hasattr(ds, "GetRasterBand"):
                img = ds.GetRasterBand(1).ReadAsArray()
                lonUpperLeft, lonResolution, lonRotation, latUpperLeft, latRotation, latResolution = ds.GetGeoTransform()
            else:
                img = ds.read(1)
                lonUpperLeft, latUpperLeft = ds.transform[2], ds.transform[5]
                lonResolution, latResolution = ds.transform[0], ds.transform[4]

        vectorizedLandCover = numpy.vectorize(lambda tlat, tlon: vectorizeLandCoverCalc(
            lat=tlat,
            lon=tlon,
            img=img,
            lonUpperLeft=lonUpperLeft,
            lonResolution=lonResolution,
            latUpperLeft=latUpperLeft,
            latResolution=latResolution
        ))

        landcover = vectorizedLandCover(xarray_dataset.lat.values, xarray_dataset.lon.values)
        xarray_dataset = xarray_dataset.assign_coords(landcover=(('i', 'j'), landcover))
        return xarray_dataset

    def getRoughnessAtPoint(self, lon, lat, inputCRS=WSG84, dataSourceName=None):
        """
        Get the roughness value of a specific point in the map.

        Parameters
        ----------
        lon : float
            The longitude coordinates.

        lat : float
            The latitude coordinates.

        inputCRS : int, default=WSG84
            The EPSG of the coordinates.

        dataSourceName : string, default=None
            The name of the data source to use.

        Returns
        -------
        float
        """
        dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

        try:
            datasourceDocument = self.getDataSourceDocument(dataSourceName)
            if datasourceDocument and 'desc' in datasourceDocument:
                type_name = datasourceDocument['desc'].get('type', None)
                if not type_name and 'desc' in datasourceDocument['desc']:
                    type_name = datasourceDocument['desc']['desc'].get('type', None)
            else:
                type_name = None
        except Exception:
            datasourceDocument = None
            type_name = None

        if type_name is None:
            print("[WARNING] No metadata available for data source. Using default type: IGBP.")
            type_name = "IGBP"

        landcover = self.getLandCoverAtPoint(lon=lon, lat=lat, inputCRS=inputCRS, dataSourceName=dataSourceName)

        if type_name == "IGBP":
            roughness_mapping = {
                0: 0.01,  # Example values
                1: 0.1,
                2: 0.15,
                3: 0.2,
                4: 0.25,
                5: 0.3,
                6: 0.4,
                7: 0.5,
                8: 0.6,
                9: 0.7,
                10: 0.8,
                11: 0.9,
                12: 1.0,
                13: 1.1,
                14: 1.2,
                15: 1.3,
                16: 1.4
            }
            return float(roughness_mapping.get(landcover, 0.05))  # ברירת מחדל אם אין התאמה
        else:
            handlerFunction = getattr(self, f"_handleType{type_name}")
            return handlerFunction(landcover)

    def getRoughnessFromLandcover(self, landcover, windMeteorologicalDirection=None, resolution=None, isBuilding=False,
                                  dataSourceName=None, GIS_BUILDINGS_dataSourceName=None):
        """
        Adds Roughness field (z0) to landcover Xarray.

        Parameters
        ----------
        landcover : Xarray
            Landcover Xarray map in which the Roughness field will be added to.

        windMeteorologicalDirection: double, default=None
            The meteorological angle. Must be specified only if data includes urbanic area.

        resolution: double, default=None
            The size of the squares. Must be specified only if data includes urbanic area.

        isBuilding : bool, default=False
            Is the landcover contains urbanic area.

        dataSourceName : string, default=None
            The name of the data source to use.

        GIS_BUILDINGS_dataSourceName: string, default=None
            The name of the GIS Buildings datasource name. Relevant if landcover contains Urban areas.

        Returns
        -------
        xarray.DataArray
        """
        dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

        try:
            datasourceDocument = self.getDataSourceDocument(dataSourceName)
        except Exception:
            datasourceDocument = None

        if not isBuilding:
            # במקרה שאין מטא־דאטא או שאין מידע שימושי בו, נ fallback לסוג 'IGBP'
            if datasourceDocument and 'desc' in datasourceDocument:
                desc = datasourceDocument['desc']
                type_name = desc.get('type') or (desc.get('desc', {}).get('type') if isinstance(desc, dict) else None)
            else:
                type_name = "IGBP"
                print("[WARNING] No metadata available for data source. Using default type: IGBP.")

            handler_name = f"_handleType{type_name}"

            # fallback דינמי: אם אין handler כזה, נשתמש בזה של IGBP המובנה
            handlerFunction = getattr(self, handler_name, None)
            if handlerFunction is None and type_name == "IGBP":
                handlerFunction = lambda lc_value: {
                    0: 0.01, 1: 0.02, 2: 0.05, 3: 0.1, 4: 0.15,
                    5: 0.2, 6: 0.25, 7: 0.3, 8: 0.35, 9: 0.4,
                    10: 0.45, 11: 0.5, 12: 0.55, 13: 0.6,
                    14: 0.01, 15: 0.001, 16: 0.0001
                }.get(int(lc_value), 0.1)
            elif handlerFunction is None:
                raise AttributeError(f"Handler function '{handler_name}' not found in LandCoverToolkit.")

            # Apply vectorized roughness handler
            roughness_values = np.vectorize(handlerFunction)(landcover['landcover'])
            landcover = landcover.assign_coords(z0=(['i', 'j'], roughness_values))

        else:
            if windMeteorologicalDirection is None or resolution is None:
                raise ValueError(
                    "windMeteorologicalDirection and resolution must be specified for calculating urban area.")
            landcover = self._getUrbanRoughnessFromLandCover(
                landcover,
                windMeteorologicalDirection,
                resolution,
                dataSourceName,
                GIS_BUILDINGS_dataSourceName
            )

        return landcover

    def getRoughness(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None, isBuilding=False,
                     windMeteorologicalDirection=None, resolution=None, GIS_BUILDINGS_dataSourceName=None):
        """
        Returns Xarray LandCover map with Roughness (zo) field.
        Just as applying getLandCover and getRoughnessFromLandcover at the same time.

        Parameters
        ----------
        minx : float
            Minimum value of X axis. If using WSG84 - the minimum Latitude coordinates.

        miny : float
            Minimum value of Y axis. If using WSG84 - the minimum Longitude coordinates.

        maxx : float
            Maximum value of X axis. If using WSG84 - the maximum Latitude coordinates.

        maxy : float
            Maximum value of Y axis. If using WSG84 - the maximum Longitude coordinates.

        dxdy : int, default=30
            Spatial resolution of the output land cover map.

        inputCRS : int, default=WSG84
            The EPSG of the coordinates.

        dataSourceName : string, default=None
            The name of the data source to use.

        isBuilding : bool, default=False
            Is the landcover contains building area.

        windMeteorologicalDirection: double, default=None
            The meteorological angle. Must be specified only if data includes urbanic area.

        resolution: double, default=None
            The size of the squares. Must be specified only if data includes urbanic area.

        Returns
        -------
        xarray.DataArray
        """
        landcover = self.getLandCover(minx, miny, maxx, maxy, dxdy=dxdy, inputCRS=inputCRS,
                                      dataSourceName=dataSourceName)
        landcover = self.getRoughnessFromLandcover(
            landcover,
            windMeteorologicalDirection=windMeteorologicalDirection,
            resolution=resolution,
            isBuilding=isBuilding,
            dataSourceName=dataSourceName,
            GIS_BUILDINGS_dataSourceName=GIS_BUILDINGS_dataSourceName
        )
        return landcover

    def _handleType1(self, landcover):
        """
        Converting land type of Type-1 to roughness.
        Based on the paper:
            * https://wes.copernicus.org/articles/6/1379/2021/ table a2
            * https://doi.org/10.5194/wes-6-1379-2021 Satellite-based estimation of roughness lengths and displacement heights for wind resource modelling, Rogier Floors, Merete Badger, Ib Troen, Kenneth Grogan, and Finn-Hendrik Permien

        Parameters
        ----------
        landcover : int
            Landcover type value.

        Returns
        -------
        float
        """
        roughnessDict = {
            0: 0.0001,  # Water
            1: 1,  # Evergreen needleleaf forest
            2: 1,  # Evergreen broadleaf forest
            3: 1,  # Deciduous needleleaf forest
            4: 1,  # Deciduous broadleaf forest
            5: 1,  # Mixed forests
            6: 0.05,  # Closed shrubland
            7: 0.06,  # Open shrublands
            8: 0.05,  # Woody savannas
            9: 0.15,  # Savannas
            10: 0.12,  # Grasslands
            11: 0.3,  # Permanent wetlands
            12: 0.15,  # Croplands
            13: 0.8,  # Urban and built-up
            14: 0.14,  # Cropland/natural vegetation mosaic
            15: 0.001,  # Snow and ice
            16: 0.01  # Barren or sparsely vegetated
        }
        return roughnessDict.get(landcover, 0.05)

    def getCodingMap(self, datasourceName):
        """
        Returns dictionary that maps landcover int value to string of landcover.

        Parameters
        ----------
        datasourceName : str
            Datasource type (Type-1, Type-2 etc.)

        Returns
        -------
        dict
        """
        if datasourceName == 'Type-1':
            return {
                0: "Water",
                1: "Evergreen needleleaf forest",
                2: "Evergreen broadleaf forest",
                3: "Deciduous needleleaf forest",
                4: "Deciduous broadleaf forest",
                5: "Mixed forests",
                6: "Closed shrubland",
                7: "Open shrublands",
                8: "Woody savannas",
                9: "Savannas",
                10: "Grasslands",
                11: "Permanent wetlands",
                12: "Croplands",
                13: "Urban and built-up",
                14: "Cropland/natural vegetation mosaic",
                15: "Snow and ice",
                16: "Barren or sparsely vegetated"
            }
        return {}

    @staticmethod
    def roughnesslength2sandgrainroughness(rl):
        """
        Converts roughness length to equivalent sand grain roughness.

        Based on:
        Desmond, C. J., Watson, S. J., & Hancock, P. E. (2017).
        Modelling the wind energy resource in complex terrain and atmospheres.
        Numerical simulation and wind tunnel investigation of non-neutral forest canopy flow.
        Journal of wind engineering and industrial aerodynamics, 166, 48-60.
        https://www.sciencedirect.com/science/article/pii/S0167610516300083#bib12

        Equation 5: Equivalent sand grain roughness (m) is z0 * 30

        We can use it for "nutkRoughWallFunction" boundary condition for Ks (sand grain roughness) parameter.
        Cs value can be set as 0.5.

        Parameters
        ----------
        rl : float
            Roughness length.

        Returns
        -------
        float
            Equivalent sand grain roughness (Ks).
        """
        return rl * 30.0  # return Ks value

    def _getUrbanRoughnessFromLandCover(self, landcover, windMeteorologicalDirection, resolution, dataSourceName, GIS_BUILDINGS_dataSourceName):
        """
        Add Roughness for Urban areas to landcover Xarray.
        z0 and dd fields are calculated from LambdaP, LambdaF and HC of each Urban area.

        Parameters
        ----------
        landcover : xarray.DataArray
            Landcover data to update.

        windMeteorologicalDirection : float
            Meteorological wind direction for building analysis.

        resolution : float
            Resolution to calculate Lambda values.

        dataSourceName : str
            Data source name.

        GIS_BUILDINGS_dataSourceName : str
            Buildings data source name.

        Returns
        -------
        xarray.DataArray
            Updated landcover with urban roughness parameters.
        """

        # [FIX] Import geopandas inside if needed
        import geopandas as gpd

        gis_building_tk = toolkitHome.getToolkit(toolkitName=toolkitHome.GIS_BUILDINGS, projectName=self.projectName)

        min_geom = convertCRS(
            points=[[float(min(landcover.lon.values.flatten())), float(min(landcover.lat.values.flatten()))]],
            inputCRS=WSG84, outputCRS=ITM
        )[0]
        max_geom = convertCRS(
            points=[[float(max(landcover.lon.values.flatten())), float(max(landcover.lat.values.flatten()))]],
            inputCRS=WSG84, outputCRS=ITM
        )[0]

        # [FIX] Wrap to GeoDataFrame to ensure access to x,y
        min_pp = gpd.GeoDataFrame(geometry=[min_geom], crs=ITM)
        max_pp = gpd.GeoDataFrame(geometry=[max_geom], crs=ITM)

        buildings = gis_building_tk.getBuildingsFromRectangle(
            minx=min_pp.geometry[0].x,
            miny=min_pp.geometry[0].y,
            maxx=max_pp.geometry[0].x,
            maxy=max_pp.geometry[0].y,
            dataSourceName=GIS_BUILDINGS_dataSourceName,
            inputCRS=ITM
        )

        if len(buildings) == 0:
            raise ValueError("Buildings DataFrame for specified coordinates is empty.")

        lambdaGrid = gis_building_tk.analysis.LambdaFromBuildingData(
            windMeteorologicalDirection,
            resolution,
            buildings
        )
        lambdaGrid.crs = ITM
        lambdaGrid = self._getRoughnessFromBuildingsDataFrame(lambdaGrid)

        square_size = float(landcover.dxdy.values)

        # [FIX] Initialize empty fields with NaN
        for field in ['z0', 'dd', 'lambdaF', 'lambdaP', 'hc', 'll']:
            landcover[field] = (('i', 'j'), np.full((landcover.sizes['i'], landcover.sizes['j']), np.nan))

        # [FIX] Efficient loops using tqdm
        for i in tqdm(range(landcover.sizes['i']), desc="Urban roughness calculation"):
            for j in range(landcover.sizes['j']):
                lon = landcover.lon.values[i, j]
                lat = landcover.lat.values[i, j]

                point_geom = convertCRS(points=[[lon, lat]], inputCRS=WSG84, outputCRS=ITM)[0]
                point = gpd.GeoDataFrame(geometry=[point_geom], crs=ITM)

                lambdas = lambdaGrid.loc[lambdaGrid.geometry.intersects(point.geometry[0])]

                if len(lambdas) == 0:
                    landcover['z0'].values[i, j] = self.getLandCoverAtPoint(lon, lat, WSG84, dataSourceName)
                    landcover['dd'].values[i, j] = 0
                    landcover['lambdaF'].values[i, j] = 0
                    landcover['lambdaP'].values[i, j] = 0
                    landcover['hc'].values[i, j] = 0
                    landcover['ll'].values[i, j] = 0
                else:
                    landcover['z0'].values[i, j] = lambdas['zz0'].values[0]
                    landcover['dd'].values[i, j] = lambdas['dd'].values[0]
                    landcover['lambdaF'].values[i, j] = lambdas['lambdaF'].values[0]
                    landcover['lambdaP'].values[i, j] = lambdas['lambdaP'].values[0]
                    landcover['hc'].values[i, j] = lambdas['hc'].values[0]
                    landcover['ll'].values[i, j] = lambdas['ll'].values[0]

        return landcover


    def _getRoughnessFromBuildingsDataFrame(self,lambdaGrid):
        """Compute roughness parameters from a buildings lambda GeoDataFrame.

        Parameters
        ----------
        lambdaGrid : geopandas.GeoDataFrame
            DataFrame with lambdaF, lambdaP, and hc columns.

        Returns
        -------
        geopandas.GeoDataFrame
            Updated DataFrame with z0, dd, Lc, and ll columns.
        """
        lambdaGrid.loc[(lambdaGrid.hc < 2), "lambdaF"] = 0.25
        lambdaGrid.loc[(lambdaGrid.hc < 2), "lambdaP"] = 0.25
        lambdaGrid.loc[(lambdaGrid.hc < 2), "hc"] = 2
        lambdaGrid.loc[(lambdaGrid.lambdaF > 0.4), "lambdaF"] = 0.4
        lambdaGrid.loc[(lambdaGrid.lambdaP > 0.6), "lambdaP"] = 0.6
        lambdaGrid["Lc"] = lambdaGrid["hc"] * (1 - lambdaGrid["lambdaP"]) / lambdaGrid["lambdaF"]
        lambdaGrid["ll"] = 2 * (BETA ** 3) * lambdaGrid["Lc"]
        lambdaGrid["zz0"] = lambdaGrid["ll"] / KARMAN * np.exp(-KARMAN / BETA)
        lambdaGrid["dd"] = lambdaGrid["ll"] / KARMAN
        return lambdaGrid



    @staticmethod
    def roughnesslength2sandgrainroughness(rl):
        """Convert roughness length to equivalent sand grain roughness (Ks = z0 * 30)."""
    #Desmond, C. J., Watson, S. J., & Hancock, P. E. (2017). Modelling the wind energy resource in complex terrain and atmospheres. Numerical simulation and wind tunnel investigation of non-neutral forest canopy flow. Journal of wind engineering and industrial aerodynamics, 166, 48-60.‏
    # https://www.sciencedirect.com/science/article/pii/S0167610516300083#bib12
    # eq. 5: Equivalent sand grain roughness (m) is z0*30

    # we can you it for "nutkRoughWallFunction" boundary condition for Ks (sand grain roughness) parameter
    # Cs value can be set as 0.5

        return rl*30.0 # return Ks value

__init__(projectName, filesDirectory=None, connectionName=None)

Initializes land cover data toolkit.

Parameters:

Name Type Description Default
projectName

The project Name that the toolkit is initialized on

required
toolkitName

the specific toolkit, getting from the child.

required
FilesDirectory

The path to save a regions files when they are created. if str then represents a path (relative or absolute) to save the files in. The directory is created automatically. if None, then tries to get the default path of the project from the config. if it does not exist, then use the current directory.

required
Source code in hera/measurements/GIS/raster/landcover.py
def __init__(self, projectName, filesDirectory=None,connectionName=None):
    """
    Initializes land cover data toolkit.

    Parameters
    ----------
    projectName: str
        The project Name that the toolkit is initialized on
    toolkitName: str
        the specific toolkit, getting from the child.

    FilesDirectory: str or None
        The path to save a regions files when they are created.
        if str then represents a path (relative or absolute) to save the files in. The directory is created automatically.
        if None, then tries to get the default path of the project from the config. if it does not
        exist, then use the current directory.
    """
    super().__init__(projectName=projectName, toolkitName='LandCoverToolkit', filesDirectory=filesDirectory,connectionName=connectionName)
    self._presentation = presentation(dataLayer=self)

getLandCoverAtPoint(lon, lat, inputCRS=WSG84, dataSourceName=None)

Get the landcover type integer value in a specific point.

Parameters:

Name Type Description Default
lon float

The longitude coordinate.

required
lat float

The latitude coordinate.

required
inputCRS int

The EPSG of the input coordinates.

WSG84
dataSourceName string

The name or path of the data source to use.

None

Returns:

Type Description
int

Land cover integer value at the point.

Source code in hera/measurements/GIS/raster/landcover.py
def getLandCoverAtPoint(self, lon, lat, inputCRS=WSG84, dataSourceName=None):
    """
    Get the landcover type integer value in a specific point.

    Parameters
    ----------
    lon : float
        The longitude coordinate.

    lat : float
        The latitude coordinate.

    inputCRS : int, default=WSG84
        The EPSG of the input coordinates.

    dataSourceName : string, default=None
        The name or path of the data source to use.

    Returns
    -------
    int
        Land cover integer value at the point.
    """

    # קבע שם ברירת מחדל אם לא ניתן
    dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

    # נסה להשיג את ה-DataSource דרך הפונקציה הקיימת
    ds = self.getDataSourceData(dataSourceName)

    # If the datasource stores a file path (dataFormat="string"), open it with rasterio
    if isinstance(ds, str) and os.path.isfile(ds):
        ds = rasterio.open(ds)
    elif ds is None:
        if isinstance(dataSourceName, str) and os.path.isfile(dataSourceName):
            ds = rasterio.open(dataSourceName)
        else:
            raise ValueError(f"Could not load data source: {dataSourceName}")

    # המר קואורדינטות אם צריך
    if inputCRS != ds.crs.to_epsg():
        transformer = Transformer.from_crs(inputCRS, ds.crs.to_epsg(), always_xy=True)
        lon, lat = transformer.transform(lon, lat)

    # קריאה של התמונה והמידע המרחבי
    img = ds.read(1)
    gt = ds.transform
    width = ds.width
    height = ds.height

    # חישוב אינדקסים של פיקסל
    x = math.floor((lat - gt.f) / gt.e)
    y = math.floor((lon - gt.c) / gt.a)

    # בדיקה שהפיקסל בטווח
    if x < 0 or x >= img.shape[0] or y < 0 or y >= img.shape[1]:
        raise IndexError(f"Point ({lon},{lat}) is out of raster bounds.")

    return int(img[x, y])

getLandCover(minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None)

Get Xarray LandCover map.

Parameters:

Name Type Description Default
minx float

Minimum value of X axis. If using WSG84 - the minimum Latitude coordinates.

required
miny float

Minimum value of Y axis. If using WSG84 - the minimum Longitude coordinates.

required
maxx

Maximum value of X axis. If using WSG84 - the maximum Latitude coordinates.

required
maxy

Maximum value of Y axis. If using WSG84 - the maximum Longitude coordinates.

required
dxdy

Spatial resolution of the output land cover map.

30
inputCRS int

The EPSG of the coordinates.

WSG84
dataSourceName string

The name of the data source to use.

None

Returns:

Type Description
DataArray
Source code in hera/measurements/GIS/raster/landcover.py
def getLandCover(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None):
    """
    Get Xarray LandCover map.

    Parameters
    ----------
    minx : float
        Minimum value of X axis. If using WSG84 - the minimum Latitude coordinates.

    miny : float
        Minimum value of Y axis. If using WSG84 - the minimum Longitude coordinates.

    maxx: float
        Maximum value of X axis. If using WSG84 - the maximum Latitude coordinates.

    maxy: float
        Maximum value of Y axis. If using WSG84 - the maximum Longitude coordinates.

    dxdy: int, default=30
        Spatial resolution of the output land cover map.

    inputCRS : int, default=WSG84
        The EPSG of the coordinates.

    dataSourceName : string, default=None
        The name of the data source to use.

    Returns
    -------
    xarray.DataArray
    """

    def vectorizeLandCoverCalc(lat, lon, img, lonUpperLeft, lonResolution, latUpperLeft, latResolution):
        """Return the landcover pixel value at a given lat/lon coordinate."""
        ilat = math.floor((lat - latUpperLeft) / latResolution)
        ilon = math.floor((lon - lonUpperLeft) / lonResolution)
        return img[ilat, ilon]

    xarray_dataset = create_xarray(minx, miny, maxx, maxy, dxdy, inputCRS)
    dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

    # [ADD] Handle raw file path directly if given
    if isinstance(dataSourceName, str) and os.path.isfile(dataSourceName):
        with rasterio.open(dataSourceName) as src:
            img = src.read(1)
            lonUpperLeft, latUpperLeft = src.transform[2], src.transform[5]
            lonResolution, latResolution = src.transform[0], src.transform[4]
    else:
        ds = self.getDataSourceData(dataSourceName)
        if isinstance(ds, str) and os.path.isfile(ds):
            # dataFormat="string" returns the file path; open with rasterio
            with rasterio.open(ds) as src:
                img = src.read(1)
                lonUpperLeft, latUpperLeft = src.transform[2], src.transform[5]
                lonResolution, latResolution = src.transform[0], src.transform[4]
        elif hasattr(ds, "GetRasterBand"):
            img = ds.GetRasterBand(1).ReadAsArray()
            lonUpperLeft, lonResolution, lonRotation, latUpperLeft, latRotation, latResolution = ds.GetGeoTransform()
        else:
            img = ds.read(1)
            lonUpperLeft, latUpperLeft = ds.transform[2], ds.transform[5]
            lonResolution, latResolution = ds.transform[0], ds.transform[4]

    vectorizedLandCover = numpy.vectorize(lambda tlat, tlon: vectorizeLandCoverCalc(
        lat=tlat,
        lon=tlon,
        img=img,
        lonUpperLeft=lonUpperLeft,
        lonResolution=lonResolution,
        latUpperLeft=latUpperLeft,
        latResolution=latResolution
    ))

    landcover = vectorizedLandCover(xarray_dataset.lat.values, xarray_dataset.lon.values)
    xarray_dataset = xarray_dataset.assign_coords(landcover=(('i', 'j'), landcover))
    return xarray_dataset

getRoughnessAtPoint(lon, lat, inputCRS=WSG84, dataSourceName=None)

Get the roughness value of a specific point in the map.

Parameters:

Name Type Description Default
lon float

The longitude coordinates.

required
lat float

The latitude coordinates.

required
inputCRS int

The EPSG of the coordinates.

WSG84
dataSourceName string

The name of the data source to use.

None

Returns:

Type Description
float
Source code in hera/measurements/GIS/raster/landcover.py
def getRoughnessAtPoint(self, lon, lat, inputCRS=WSG84, dataSourceName=None):
    """
    Get the roughness value of a specific point in the map.

    Parameters
    ----------
    lon : float
        The longitude coordinates.

    lat : float
        The latitude coordinates.

    inputCRS : int, default=WSG84
        The EPSG of the coordinates.

    dataSourceName : string, default=None
        The name of the data source to use.

    Returns
    -------
    float
    """
    dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

    try:
        datasourceDocument = self.getDataSourceDocument(dataSourceName)
        if datasourceDocument and 'desc' in datasourceDocument:
            type_name = datasourceDocument['desc'].get('type', None)
            if not type_name and 'desc' in datasourceDocument['desc']:
                type_name = datasourceDocument['desc']['desc'].get('type', None)
        else:
            type_name = None
    except Exception:
        datasourceDocument = None
        type_name = None

    if type_name is None:
        print("[WARNING] No metadata available for data source. Using default type: IGBP.")
        type_name = "IGBP"

    landcover = self.getLandCoverAtPoint(lon=lon, lat=lat, inputCRS=inputCRS, dataSourceName=dataSourceName)

    if type_name == "IGBP":
        roughness_mapping = {
            0: 0.01,  # Example values
            1: 0.1,
            2: 0.15,
            3: 0.2,
            4: 0.25,
            5: 0.3,
            6: 0.4,
            7: 0.5,
            8: 0.6,
            9: 0.7,
            10: 0.8,
            11: 0.9,
            12: 1.0,
            13: 1.1,
            14: 1.2,
            15: 1.3,
            16: 1.4
        }
        return float(roughness_mapping.get(landcover, 0.05))  # ברירת מחדל אם אין התאמה
    else:
        handlerFunction = getattr(self, f"_handleType{type_name}")
        return handlerFunction(landcover)

getRoughnessFromLandcover(landcover, windMeteorologicalDirection=None, resolution=None, isBuilding=False, dataSourceName=None, GIS_BUILDINGS_dataSourceName=None)

Adds Roughness field (z0) to landcover Xarray.

Parameters:

Name Type Description Default
landcover Xarray

Landcover Xarray map in which the Roughness field will be added to.

required
windMeteorologicalDirection

The meteorological angle. Must be specified only if data includes urbanic area.

None
resolution

The size of the squares. Must be specified only if data includes urbanic area.

None
isBuilding bool

Is the landcover contains urbanic area.

False
dataSourceName string

The name of the data source to use.

None
GIS_BUILDINGS_dataSourceName

The name of the GIS Buildings datasource name. Relevant if landcover contains Urban areas.

None

Returns:

Type Description
DataArray
Source code in hera/measurements/GIS/raster/landcover.py
def getRoughnessFromLandcover(self, landcover, windMeteorologicalDirection=None, resolution=None, isBuilding=False,
                              dataSourceName=None, GIS_BUILDINGS_dataSourceName=None):
    """
    Adds Roughness field (z0) to landcover Xarray.

    Parameters
    ----------
    landcover : Xarray
        Landcover Xarray map in which the Roughness field will be added to.

    windMeteorologicalDirection: double, default=None
        The meteorological angle. Must be specified only if data includes urbanic area.

    resolution: double, default=None
        The size of the squares. Must be specified only if data includes urbanic area.

    isBuilding : bool, default=False
        Is the landcover contains urbanic area.

    dataSourceName : string, default=None
        The name of the data source to use.

    GIS_BUILDINGS_dataSourceName: string, default=None
        The name of the GIS Buildings datasource name. Relevant if landcover contains Urban areas.

    Returns
    -------
    xarray.DataArray
    """
    dataSourceName = self.getConfig()['defaultLandCover'] if dataSourceName is None else dataSourceName

    try:
        datasourceDocument = self.getDataSourceDocument(dataSourceName)
    except Exception:
        datasourceDocument = None

    if not isBuilding:
        # במקרה שאין מטא־דאטא או שאין מידע שימושי בו, נ fallback לסוג 'IGBP'
        if datasourceDocument and 'desc' in datasourceDocument:
            desc = datasourceDocument['desc']
            type_name = desc.get('type') or (desc.get('desc', {}).get('type') if isinstance(desc, dict) else None)
        else:
            type_name = "IGBP"
            print("[WARNING] No metadata available for data source. Using default type: IGBP.")

        handler_name = f"_handleType{type_name}"

        # fallback דינמי: אם אין handler כזה, נשתמש בזה של IGBP המובנה
        handlerFunction = getattr(self, handler_name, None)
        if handlerFunction is None and type_name == "IGBP":
            handlerFunction = lambda lc_value: {
                0: 0.01, 1: 0.02, 2: 0.05, 3: 0.1, 4: 0.15,
                5: 0.2, 6: 0.25, 7: 0.3, 8: 0.35, 9: 0.4,
                10: 0.45, 11: 0.5, 12: 0.55, 13: 0.6,
                14: 0.01, 15: 0.001, 16: 0.0001
            }.get(int(lc_value), 0.1)
        elif handlerFunction is None:
            raise AttributeError(f"Handler function '{handler_name}' not found in LandCoverToolkit.")

        # Apply vectorized roughness handler
        roughness_values = np.vectorize(handlerFunction)(landcover['landcover'])
        landcover = landcover.assign_coords(z0=(['i', 'j'], roughness_values))

    else:
        if windMeteorologicalDirection is None or resolution is None:
            raise ValueError(
                "windMeteorologicalDirection and resolution must be specified for calculating urban area.")
        landcover = self._getUrbanRoughnessFromLandCover(
            landcover,
            windMeteorologicalDirection,
            resolution,
            dataSourceName,
            GIS_BUILDINGS_dataSourceName
        )

    return landcover

getRoughness(minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None, isBuilding=False, windMeteorologicalDirection=None, resolution=None, GIS_BUILDINGS_dataSourceName=None)

Returns Xarray LandCover map with Roughness (zo) field. Just as applying getLandCover and getRoughnessFromLandcover at the same time.

Parameters:

Name Type Description Default
minx float

Minimum value of X axis. If using WSG84 - the minimum Latitude coordinates.

required
miny float

Minimum value of Y axis. If using WSG84 - the minimum Longitude coordinates.

required
maxx float

Maximum value of X axis. If using WSG84 - the maximum Latitude coordinates.

required
maxy float

Maximum value of Y axis. If using WSG84 - the maximum Longitude coordinates.

required
dxdy int

Spatial resolution of the output land cover map.

30
inputCRS int

The EPSG of the coordinates.

WSG84
dataSourceName string

The name of the data source to use.

None
isBuilding bool

Is the landcover contains building area.

False
windMeteorologicalDirection

The meteorological angle. Must be specified only if data includes urbanic area.

None
resolution

The size of the squares. Must be specified only if data includes urbanic area.

None

Returns:

Type Description
DataArray
Source code in hera/measurements/GIS/raster/landcover.py
def getRoughness(self, minx, miny, maxx, maxy, dxdy=30, inputCRS=WSG84, dataSourceName=None, isBuilding=False,
                 windMeteorologicalDirection=None, resolution=None, GIS_BUILDINGS_dataSourceName=None):
    """
    Returns Xarray LandCover map with Roughness (zo) field.
    Just as applying getLandCover and getRoughnessFromLandcover at the same time.

    Parameters
    ----------
    minx : float
        Minimum value of X axis. If using WSG84 - the minimum Latitude coordinates.

    miny : float
        Minimum value of Y axis. If using WSG84 - the minimum Longitude coordinates.

    maxx : float
        Maximum value of X axis. If using WSG84 - the maximum Latitude coordinates.

    maxy : float
        Maximum value of Y axis. If using WSG84 - the maximum Longitude coordinates.

    dxdy : int, default=30
        Spatial resolution of the output land cover map.

    inputCRS : int, default=WSG84
        The EPSG of the coordinates.

    dataSourceName : string, default=None
        The name of the data source to use.

    isBuilding : bool, default=False
        Is the landcover contains building area.

    windMeteorologicalDirection: double, default=None
        The meteorological angle. Must be specified only if data includes urbanic area.

    resolution: double, default=None
        The size of the squares. Must be specified only if data includes urbanic area.

    Returns
    -------
    xarray.DataArray
    """
    landcover = self.getLandCover(minx, miny, maxx, maxy, dxdy=dxdy, inputCRS=inputCRS,
                                  dataSourceName=dataSourceName)
    landcover = self.getRoughnessFromLandcover(
        landcover,
        windMeteorologicalDirection=windMeteorologicalDirection,
        resolution=resolution,
        isBuilding=isBuilding,
        dataSourceName=dataSourceName,
        GIS_BUILDINGS_dataSourceName=GIS_BUILDINGS_dataSourceName
    )
    return landcover

getCodingMap(datasourceName)

Returns dictionary that maps landcover int value to string of landcover.

Parameters:

Name Type Description Default
datasourceName str

Datasource type (Type-1, Type-2 etc.)

required

Returns:

Type Description
dict
Source code in hera/measurements/GIS/raster/landcover.py
def getCodingMap(self, datasourceName):
    """
    Returns dictionary that maps landcover int value to string of landcover.

    Parameters
    ----------
    datasourceName : str
        Datasource type (Type-1, Type-2 etc.)

    Returns
    -------
    dict
    """
    if datasourceName == 'Type-1':
        return {
            0: "Water",
            1: "Evergreen needleleaf forest",
            2: "Evergreen broadleaf forest",
            3: "Deciduous needleleaf forest",
            4: "Deciduous broadleaf forest",
            5: "Mixed forests",
            6: "Closed shrubland",
            7: "Open shrublands",
            8: "Woody savannas",
            9: "Savannas",
            10: "Grasslands",
            11: "Permanent wetlands",
            12: "Croplands",
            13: "Urban and built-up",
            14: "Cropland/natural vegetation mosaic",
            15: "Snow and ice",
            16: "Barren or sparsely vegetated"
        }
    return {}

roughnesslength2sandgrainroughness(rl) staticmethod

Convert roughness length to equivalent sand grain roughness (Ks = z0 * 30).

Source code in hera/measurements/GIS/raster/landcover.py
@staticmethod
def roughnesslength2sandgrainroughness(rl):
    """Convert roughness length to equivalent sand grain roughness (Ks = z0 * 30)."""
#Desmond, C. J., Watson, S. J., & Hancock, P. E. (2017). Modelling the wind energy resource in complex terrain and atmospheres. Numerical simulation and wind tunnel investigation of non-neutral forest canopy flow. Journal of wind engineering and industrial aerodynamics, 166, 48-60.‏
# https://www.sciencedirect.com/science/article/pii/S0167610516300083#bib12
# eq. 5: Equivalent sand grain roughness (m) is z0*30

# we can you it for "nutkRoughWallFunction" boundary condition for Ks (sand grain roughness) parameter
# Cs value can be set as 0.5

    return rl*30.0 # return Ks value

TilesToolkit

hera.measurements.GIS.raster.tiles.TilesToolkit

Bases: abstractToolkit

A class to handle an image that represents a location. Looks up the location in the public database in project 'imageLocation'.

Source code in hera/measurements/GIS/raster/tiles.py
class TilesToolkit(toolkit.abstractToolkit):
    """
    A class to handle an image that represents a location.
    Looks up the location in the public database in project 'imageLocation'.
    """
    WSG84 = 4326
    ITM    = 2039
    ED50_ZONE36N = 23036

    Z0RES = 156543.03

    @property
    def doctype(self):
        """Return the document type string for tile images."""
        return f"{self.toolkitName}_PNG"


    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        """Initialize the TilesToolkit.

        Parameters
        ----------
        projectName : str
            The project name to initialize the toolkit on.
        filesDirectory : str or None
            Path to save tile files. Uses default if None.
        connectionName : str or None
            Name of the database connection.
        """
        super().__init__(projectName=projectName,
                         toolkitName="Tiles",
                         filesDirectory=filesDirectory,
                         connectionName=connectionName)

        self._presentation = presentation(dataLayer=self)

    def tileScaleAtLatLonZoom(self,latitude,longitude,zoomlevel):
        """
        Returns the scale of a tile im meters at the location and zoom level

        Parameters
        ----------
        latitude : float
            The latitude in WGS84

        longitude : float
            The longitude in WGS

        zoomlevel : int
            The zoom to retrieve. usually up to ~19. (highest).

        Returns
        -------
            float
        """
        return self.Z0RES / (2 ** zoomlevel) * numpy.cos(numpy.deg2rad(latitude))

    def getImageFromCorners(self, minx, miny, maxx, maxy, zoomlevel, tileServer=None, inputCRS=WSG84, outputCRS=WSG84):
        """
        Gets the image from the lower left corner and upper right cornet - [left,right,bottom,top] in the coordinate system of the outputCRS.
        The lowerLeft,upperRight are given in WGS84 (degrees) if dgrees are True and in Israel 1993 / Israeli TM Grid.

        Parameters
        ----------
        minx: float
            Minimux X coordinate value of the image.

        miny: float
            Minimux Y coordinate value of the image.

        maxx: float
            Maximum X coordinate value of the image.

        maxy: float
            Maximum X coordinate value of the image.

        zoomlevel : int
            The zoom to retrieve. usually up to ~19. (highest).

        tileServer : string, default=None
            The tile server. If None, get the default one (defaultTileServer in the config)

        inputCRS : int,default=WSG84
            The ESPG of the input coordinates.

        outputCRS: int.default=WSG84
            The ESPG of the output coordinates.

        Returns
        -------
            tuple
        """
        logger = get_classMethod_logger(self,name="getImageFromTiles")
        logger.info(f"------- Start : {logger.name}")


        lon = [maxy, miny]
        lat = [minx, maxx]

        if tileServer is None:
            tileServer = self.getConfig().get("defaultTileServer",None)
            if tileServer is None:
                err = f"There is not default tile server in project {self.projectName}, and time server was not supplied!. exiting."
                logger.error(err)
                raise ValueError(err)

        gdf = geopandas.GeoDataFrame(
            None, geometry=geopandas.points_from_xy(lat, lon), crs=inputCRS  # "EPSG:4326"
        )

        logger.info(f"Converting the input coordinates from EPSG {inputCRS} to WGS84 (EPSG:4326)")
        gdf = gdf.to_crs(WSG84)

        tileULX, tileULY = self.deg2tile(gdf.iloc[0].geometry.y,gdf.iloc[0].geometry.x, zoomlevel)
        tileLRX, tileLRY = self.deg2tile(gdf.iloc[1].geometry.y,gdf.iloc[1].geometry.x, zoomlevel)

        tileurl = self.getDataSourceData(tileServer)
        tileurl = tileServer if tileurl is None else tileurl
        img,extent =  self._getImageFromTiles([tileULX, tileULY],[tileLRX, tileLRY], zoomlevel,tileurl)

        logger.info(f"Converting the output extent {extent} from WGS84 (EPSG:4326) to EPSG {outputCRS}")
        lat = [extent[0], extent[1]]
        lon = [extent[2], extent[3]]

        gdf = geopandas.GeoDataFrame(
            None, geometry=geopandas.points_from_xy(lat,lon), crs=WSG84
        )
        gdf = gdf.to_crs(outputCRS)

        extent = [gdf.iloc[0].geometry.x,gdf.iloc[1].geometry.x,gdf.iloc[1].geometry.y,gdf.iloc[0].geometry.y]
        return img,extent




    def _getImageFromTiles(self, ulTiles, lrTiles, zoomLevel, tileServer, square=True):
        """
            Creates an image with range of the tiles and compute their
            extent in degrees

            The pro

        Parameters
        ----------
        llTiles : list, tuple
                    The lower left corner tile. (latitude (NE),longitude (EW))
        urTiles : list, tuple
                    The upper right corner tile. (latitude (NE),longitude (EW))

        zoomLevel : integer
                    The zoom to retrieve. usually up to ~19. (highest).

        square : bool
                If true, return a square image by extening the minimal dimension.

        tileServer: string
                The url of the tile server. We assume that the zoom, x and y are give by {z},{x} and {y}.
        Returns
        -------
            Tuple. :
                The Image and a the extent of the image in degrees (WGS84): [left,right,bottom,top]
        """
        logger = get_classMethod_logger(self,name="getImageFromTiles")
        logger.info("------- Start")

        height = numpy.abs(ulTiles[1] - lrTiles[1])
        width = numpy.abs(ulTiles[0] - lrTiles[0])

        logger.debug(f"The number of tiles in x: {ulTiles[1]}-{lrTiles[1]}={width} and y: {ulTiles[0]}-{lrTiles[0]} {height}.")

        if square:
            sqrx = numpy.max([width, height])
            sqry = numpy.max([width, height])
            logger.debug(f"Squaring the image to be of equal height and width: {sqrx}")
        else:
            logger.debug(f"Get the image as requested (no squaring)")
            sqry = height
            sqrx = width

        lrTiles[0] = ulTiles[0] + sqrx
        lrTiles[1] = ulTiles[1]+ sqry

        TILESIZE = 256
        finalimg = Image.new('RGB', (TILESIZE * sqrx, TILESIZE * sqry))
        logger.info(f"Getting the Image  (tiles {sqrx}x{sqry}) from {tileServer}")
        for i, j in product(range(sqrx), range(sqry)):
            #response = requests.get(f"http://192.168.14.118/resat_tiles/{zoomlevel}/{tileLLX + i}/{}.png")
            logger.debug(f"Getting address {tileServer.format(z=zoomLevel,x=ulTiles[0] + i,y=ulTiles[1] + j)}")
            response = requests.get(tileServer.format(z=zoomLevel,x=ulTiles[0] + i,y=ulTiles[1] + j))
            img = Image.open(BytesIO(response.content))
            logger.debug(f"({i},{j}) --> {ulTiles[0] + i}, {ulTiles[1] + j}")
            finalimg.paste(img, (i * TILESIZE, j * TILESIZE))

        boundULY, boundULX = self.tile2deg(ulTiles[0], ulTiles[1] , zoomLevel)
        boundLRY, boundLRX = self.tile2deg(lrTiles[0], lrTiles[1], zoomLevel)
        logger.info(f"Got the image extent (upper left -> lower right): ({ulTiles[0]},{ulTiles[1]})={[boundULY, boundULX]} and ({lrTiles[0]},{lrTiles[1]})={[boundLRY, boundLRX]}")

        return finalimg,[boundULX,boundLRX,boundULY,boundLRY]




    def tile2deg(self, xtile, ytile, zoom):
        """Convert tile coordinates to WGS84 latitude/longitude.

        Parameters
        ----------
        xtile, ytile : int
            Tile x and y indices.
        zoom : int
            Zoom level.

        Returns
        -------
        tuple of float
            (latitude, longitude) in degrees.
        """
        n = 2.0 ** zoom
        lon_deg = xtile / n * 360.0 - 180.0
        lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
        lat_deg = math.degrees(lat_rad)
        return (lat_deg, lon_deg)

    def deg2tile(self, lat_deg, lon_deg, zoom):
        """Convert WGS84 latitude/longitude to tile coordinates.

        Parameters
        ----------
        lat_deg, lon_deg : float
            Latitude and longitude in degrees.
        zoom : int
            Zoom level.

        Returns
        -------
        tuple of int
            (xtile, ytile) tile indices.
        """
        lat_rad = math.radians(lat_deg)
        n = 2.0 ** zoom
        xtile = int((lon_deg + 180.0) / 360.0 * n)
        ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
        return (xtile, ytile)

    def listImages(self,**filters):
        """Return stored tile image documents matching the given filters."""
        return self.getMeasurementsDocuments(type=self.doctype, **filters)

    def setDefaultTileServer(self,server):
        """Set the default tile server URL in the project config.

        Parameters
        ----------
        server : str
            URL template for the tile server.
        """
        self.setConfig(**{"defaultTileServer": server})

doctype property

Return the document type string for tile images.

__init__(projectName, filesDirectory=None, connectionName=None)

Initialize the TilesToolkit.

Parameters:

Name Type Description Default
projectName str

The project name to initialize the toolkit on.

required
filesDirectory str or None

Path to save tile files. Uses default if None.

None
connectionName str or None

Name of the database connection.

None
Source code in hera/measurements/GIS/raster/tiles.py
def __init__(self, projectName, filesDirectory=None,connectionName=None):
    """Initialize the TilesToolkit.

    Parameters
    ----------
    projectName : str
        The project name to initialize the toolkit on.
    filesDirectory : str or None
        Path to save tile files. Uses default if None.
    connectionName : str or None
        Name of the database connection.
    """
    super().__init__(projectName=projectName,
                     toolkitName="Tiles",
                     filesDirectory=filesDirectory,
                     connectionName=connectionName)

    self._presentation = presentation(dataLayer=self)

tileScaleAtLatLonZoom(latitude, longitude, zoomlevel)

Returns the scale of a tile im meters at the location and zoom level

Parameters:

Name Type Description Default
latitude float

The latitude in WGS84

required
longitude float

The longitude in WGS

required
zoomlevel int

The zoom to retrieve. usually up to ~19. (highest).

required

Returns:

Type Description
float
Source code in hera/measurements/GIS/raster/tiles.py
def tileScaleAtLatLonZoom(self,latitude,longitude,zoomlevel):
    """
    Returns the scale of a tile im meters at the location and zoom level

    Parameters
    ----------
    latitude : float
        The latitude in WGS84

    longitude : float
        The longitude in WGS

    zoomlevel : int
        The zoom to retrieve. usually up to ~19. (highest).

    Returns
    -------
        float
    """
    return self.Z0RES / (2 ** zoomlevel) * numpy.cos(numpy.deg2rad(latitude))

getImageFromCorners(minx, miny, maxx, maxy, zoomlevel, tileServer=None, inputCRS=WSG84, outputCRS=WSG84)

Gets the image from the lower left corner and upper right cornet - [left,right,bottom,top] in the coordinate system of the outputCRS. The lowerLeft,upperRight are given in WGS84 (degrees) if dgrees are True and in Israel 1993 / Israeli TM Grid.

Parameters:

Name Type Description Default
minx

Minimux X coordinate value of the image.

required
miny

Minimux Y coordinate value of the image.

required
maxx

Maximum X coordinate value of the image.

required
maxy

Maximum X coordinate value of the image.

required
zoomlevel int

The zoom to retrieve. usually up to ~19. (highest).

required
tileServer string

The tile server. If None, get the default one (defaultTileServer in the config)

None
inputCRS int,default=WSG84

The ESPG of the input coordinates.

WSG84
outputCRS

The ESPG of the output coordinates.

WSG84

Returns:

Type Description
tuple
Source code in hera/measurements/GIS/raster/tiles.py
def getImageFromCorners(self, minx, miny, maxx, maxy, zoomlevel, tileServer=None, inputCRS=WSG84, outputCRS=WSG84):
    """
    Gets the image from the lower left corner and upper right cornet - [left,right,bottom,top] in the coordinate system of the outputCRS.
    The lowerLeft,upperRight are given in WGS84 (degrees) if dgrees are True and in Israel 1993 / Israeli TM Grid.

    Parameters
    ----------
    minx: float
        Minimux X coordinate value of the image.

    miny: float
        Minimux Y coordinate value of the image.

    maxx: float
        Maximum X coordinate value of the image.

    maxy: float
        Maximum X coordinate value of the image.

    zoomlevel : int
        The zoom to retrieve. usually up to ~19. (highest).

    tileServer : string, default=None
        The tile server. If None, get the default one (defaultTileServer in the config)

    inputCRS : int,default=WSG84
        The ESPG of the input coordinates.

    outputCRS: int.default=WSG84
        The ESPG of the output coordinates.

    Returns
    -------
        tuple
    """
    logger = get_classMethod_logger(self,name="getImageFromTiles")
    logger.info(f"------- Start : {logger.name}")


    lon = [maxy, miny]
    lat = [minx, maxx]

    if tileServer is None:
        tileServer = self.getConfig().get("defaultTileServer",None)
        if tileServer is None:
            err = f"There is not default tile server in project {self.projectName}, and time server was not supplied!. exiting."
            logger.error(err)
            raise ValueError(err)

    gdf = geopandas.GeoDataFrame(
        None, geometry=geopandas.points_from_xy(lat, lon), crs=inputCRS  # "EPSG:4326"
    )

    logger.info(f"Converting the input coordinates from EPSG {inputCRS} to WGS84 (EPSG:4326)")
    gdf = gdf.to_crs(WSG84)

    tileULX, tileULY = self.deg2tile(gdf.iloc[0].geometry.y,gdf.iloc[0].geometry.x, zoomlevel)
    tileLRX, tileLRY = self.deg2tile(gdf.iloc[1].geometry.y,gdf.iloc[1].geometry.x, zoomlevel)

    tileurl = self.getDataSourceData(tileServer)
    tileurl = tileServer if tileurl is None else tileurl
    img,extent =  self._getImageFromTiles([tileULX, tileULY],[tileLRX, tileLRY], zoomlevel,tileurl)

    logger.info(f"Converting the output extent {extent} from WGS84 (EPSG:4326) to EPSG {outputCRS}")
    lat = [extent[0], extent[1]]
    lon = [extent[2], extent[3]]

    gdf = geopandas.GeoDataFrame(
        None, geometry=geopandas.points_from_xy(lat,lon), crs=WSG84
    )
    gdf = gdf.to_crs(outputCRS)

    extent = [gdf.iloc[0].geometry.x,gdf.iloc[1].geometry.x,gdf.iloc[1].geometry.y,gdf.iloc[0].geometry.y]
    return img,extent

tile2deg(xtile, ytile, zoom)

Convert tile coordinates to WGS84 latitude/longitude.

Parameters:

Name Type Description Default
xtile int

Tile x and y indices.

required
ytile int

Tile x and y indices.

required
zoom int

Zoom level.

required

Returns:

Type Description
tuple of float

(latitude, longitude) in degrees.

Source code in hera/measurements/GIS/raster/tiles.py
def tile2deg(self, xtile, ytile, zoom):
    """Convert tile coordinates to WGS84 latitude/longitude.

    Parameters
    ----------
    xtile, ytile : int
        Tile x and y indices.
    zoom : int
        Zoom level.

    Returns
    -------
    tuple of float
        (latitude, longitude) in degrees.
    """
    n = 2.0 ** zoom
    lon_deg = xtile / n * 360.0 - 180.0
    lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
    lat_deg = math.degrees(lat_rad)
    return (lat_deg, lon_deg)

deg2tile(lat_deg, lon_deg, zoom)

Convert WGS84 latitude/longitude to tile coordinates.

Parameters:

Name Type Description Default
lat_deg float

Latitude and longitude in degrees.

required
lon_deg float

Latitude and longitude in degrees.

required
zoom int

Zoom level.

required

Returns:

Type Description
tuple of int

(xtile, ytile) tile indices.

Source code in hera/measurements/GIS/raster/tiles.py
def deg2tile(self, lat_deg, lon_deg, zoom):
    """Convert WGS84 latitude/longitude to tile coordinates.

    Parameters
    ----------
    lat_deg, lon_deg : float
        Latitude and longitude in degrees.
    zoom : int
        Zoom level.

    Returns
    -------
    tuple of int
        (xtile, ytile) tile indices.
    """
    lat_rad = math.radians(lat_deg)
    n = 2.0 ** zoom
    xtile = int((lon_deg + 180.0) / 360.0 * n)
    ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
    return (xtile, ytile)

listImages(**filters)

Return stored tile image documents matching the given filters.

Source code in hera/measurements/GIS/raster/tiles.py
def listImages(self,**filters):
    """Return stored tile image documents matching the given filters."""
    return self.getMeasurementsDocuments(type=self.doctype, **filters)

setDefaultTileServer(server)

Set the default tile server URL in the project config.

Parameters:

Name Type Description Default
server str

URL template for the tile server.

required
Source code in hera/measurements/GIS/raster/tiles.py
def setDefaultTileServer(self,server):
    """Set the default tile server URL in the project config.

    Parameters
    ----------
    server : str
        URL template for the tile server.
    """
    self.setConfig(**{"defaultTileServer": server})

Meteorology

lowFreqToolKit

hera.measurements.meteorology.lowfreqdata.toolkit.lowFreqToolKit

Bases: abstractToolkit

Manages the loading and storing of low frequency meteorological data

The data can be in the formats:

  • TOAA
  • IMS_JSON

TODO: Complete the other parsers from the older versions.

Source code in hera/measurements/meteorology/lowfreqdata/toolkit.py
class lowFreqToolKit(toolkit.abstractToolkit):
    """
        Manages the loading and storing of low frequency meteorological data

        The data can be in the formats:

        - TOAA
        - IMS_JSON
        -

        TODO:
            Complete the other parsers from the older versions.

    """

    _np_size = None
    _doc_type = None


    STATIONNAME = 'StationName'

    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        """
            Initializes a datalayer for the lowfreqdata data.

            Also looks up the 'IMSData' in the public database.

        Parameters
        ----------

        projectName: str
                The project name
        """
        super().__init__(projectName=projectName, toolkitName="lowFreqMeteorology", filesDirectory=filesDirectory,connectionName=connectionName)
        self.logger = logging.getLogger()
        self.logger.info("Init Low frequency data")

        self._analysis = analysis(self)
        self._presentation = presenation(self,self.analysis)


        self._np_size = "100MB"


    @property
    def docType(self):
        return f"{self.toolkitName}_LowFreqData"

__init__(projectName, filesDirectory=None, connectionName=None)

Initializes a datalayer for the lowfreqdata data.

Also looks up the 'IMSData' in the public database.

Parameters:

Name Type Description Default
projectName
The project name
required
Source code in hera/measurements/meteorology/lowfreqdata/toolkit.py
def __init__(self, projectName, filesDirectory=None,connectionName=None):
    """
        Initializes a datalayer for the lowfreqdata data.

        Also looks up the 'IMSData' in the public database.

    Parameters
    ----------

    projectName: str
            The project name
    """
    super().__init__(projectName=projectName, toolkitName="lowFreqMeteorology", filesDirectory=filesDirectory,connectionName=connectionName)
    self.logger = logging.getLogger()
    self.logger.info("Init Low frequency data")

    self._analysis = analysis(self)
    self._presentation = presenation(self,self.analysis)


    self._np_size = "100MB"

HighFreqToolKit

hera.measurements.meteorology.highfreqdata.toolkit.HighFreqToolKit

Bases: abstractToolkit

Toolkit for high-frequency (10–20 Hz) sonic anemometer and TRH data.

Manages parsing raw binary/ASCII sensor files, normalising the output, storing data as versioned data sources in the project, and providing an analysis layer for turbulence statistics.

Supported raw formats:

  • Campbell Scientific TOB1 binary — via :class:Parser
  • Campbell Scientific TOA5 ASCII — via :class:ASCIIParser
Source code in hera/measurements/meteorology/highfreqdata/toolkit.py
class HighFreqToolKit(toolkit.abstractToolkit):
    """Toolkit for high-frequency (10–20 Hz) sonic anemometer and TRH data.

    Manages parsing raw binary/ASCII sensor files, normalising the output,
    storing data as versioned data sources in the project, and providing
    an analysis layer for turbulence statistics.

    Supported raw formats:

    - **Campbell Scientific TOB1 binary** — via :class:`Parser`
    - **Campbell Scientific TOA5 ASCII** — via :class:`ASCIIParser`
    """

    DOCTYPE_STATIONS = 'StationsData'
    DOCTYPE_MEASUREMENTS = 'MeasurementsData'

    def __init__(self, projectName, filesDirectory=None, connectionName=None):
        """Initialise the high-frequency meteorology toolkit.

        Parameters
        ----------
        projectName : str
            The project name.
        filesDirectory : str, optional
            Directory for saving parquet files. If ``None``, uses the
            project default.
        connectionName : str, optional
            Database connection name.
        """
        super().__init__(
            projectName=projectName,
            toolkitName="highFreqMeteorology",
            filesDirectory=filesDirectory,
            connectionName=connectionName,
        )
        logger = get_classMethod_logger(self, "init")
        logger.info("Init High frequency data")
        self._analysis = RawdataAnalysis(self)

    @property
    def docType(self):
        """str : Document type identifier for high-frequency data."""
        return f"{self.toolkitName}_HighFreqData"

    # ------------------------------------------------------------------
    # Parsing + normalising
    # ------------------------------------------------------------------

    def parseData(self, path, fromTime=None, toTime=None, parser="auto"):
        """Parse a raw data file and return normalised DataFrames.

        Automatically detects the file format (Campbell binary vs TOA5
        ASCII) unless *parser* is specified explicitly. The output is
        normalised to the standard column format expected by the analysis
        layer (lowercase ``u, v, w, T`` with a ``DatetimeIndex``).

        Parameters
        ----------
        path : str
            Path to a raw data file or a directory of files.
        fromTime : str or None, optional
            Start time filter.
        toTime : str or None, optional
            End time filter.
        parser : str, optional
            Parser to use: ``'auto'`` (default), ``'campbell'``, or
            ``'toa5'``.

        Returns
        -------
        list of tuple(pandas.DataFrame, dict)
            Each element is ``(normalised_dataframe, metadata_dict)``.
            A single-device file produces a list with one element.
            Multi-device files (TOA5) produce one element per device.
        """
        raw = self._parse_raw(path, fromTime, toTime, parser)
        return self._normalise_raw(raw)

    def loadData(self, name, path, outputDirectory, fromTime=None, toTime=None,
                 parser="auto", version=(0, 0, 1), overwrite=False,
                 append=False, metadata=None):
        """Parse a raw data file, save as parquet, and register as a data source.

        This is the recommended way to ingest new high-frequency data. It
        parses, normalises, writes a parquet file, and registers the result
        as a versioned data source in the project.

        If a data source with the same name already exists in the DB:

        - **append=True**: loads the existing parquet, concatenates the new
          data, and saves back to the same file.
        - **overwrite=True**: replaces the existing file and document.
        - Both ``False`` (default): raises ``ValueError``.
        - Both ``True``: raises ``ValueError`` (mutually exclusive).

        If the data source does not exist, a new file is created using the
        project counter to generate a unique filename.

        Parameters
        ----------
        name : str
            Data source name (e.g. ``'sonic_10m'``). For multi-device
            files, device names are appended (``'sonic_10m_Raw_Sonic_1'``).
        path : str
            Path to a raw data file or directory to parse.
        outputDirectory : str
            Directory where the parquet file will be stored.
        fromTime : str or None, optional
            Start time filter.
        toTime : str or None, optional
            End time filter.
        parser : str, optional
            ``'auto'``, ``'campbell'``, or ``'toa5'``.
        version : tuple of int, optional
            Data source version ``(major, minor, patch)``. Default ``(0, 0, 1)``.
        overwrite : bool, optional
            If ``True``, replace an existing data source.
        append : bool, optional
            If ``True``, append to an existing data source's parquet file.
        metadata : dict, optional
            User-controlled metadata to store with the data source.

        Returns
        -------
        document or list of documents
            The created/updated data source document(s).

        Raises
        ------
        ValueError
            If both *overwrite* and *append* are ``True``, or if the data
            source already exists and neither flag is set.
        """
        if overwrite and append:
            raise ValueError("overwrite and append are mutually exclusive.")

        results = self.parseData(path, fromTime, toTime, parser)

        os.makedirs(outputDirectory, exist_ok=True)

        docs = []
        for df, meta in results:
            # Build data source name: append device info for multi-device
            if len(results) == 1:
                ds_name = name
            else:
                suffix = meta.get("deviceName", meta.get("deviceType", ""))
                ds_name = f"{name}_{suffix}" if suffix else name

            desc = dict(metadata) if metadata else {}

            # Check if data source already exists
            existing_doc = self.getDataSourceDocument(ds_name, version=list(version))

            if existing_doc is not None:
                if append:
                    # Load existing data, concatenate, save back
                    import pandas as pd
                    existing_df = existing_doc.getData()
                    if hasattr(existing_df, "compute"):
                        existing_df = existing_df.compute()
                    combined = pd.concat([existing_df, df]).sort_index()
                    combined = combined[~combined.index.duplicated(keep="last")]
                    output_file = existing_doc.resource
                    combined.to_parquet(output_file)
                    # Document already points to this file — no DB update needed
                    docs.append(existing_doc)
                    continue
                elif overwrite:
                    # Will be handled by addDataSource(overwrite=True) below
                    output_file = existing_doc.resource
                else:
                    raise ValueError(
                        f"Data source '{ds_name}' (version {version}) already exists. "
                        f"Use overwrite=True or append=True."
                    )
            else:
                # New data source — generate filename with counter
                file_id = self.getCounterAndAdd(f"highfreq_{ds_name}")
                output_file = os.path.join(
                    outputDirectory, f"{ds_name}_{file_id}.parquet"
                )

            df.to_parquet(output_file)

            doc = self.addDataSource(
                dataSourceName=ds_name,
                resource=output_file,
                dataFormat=datatypes.PARQUET,
                version=list(version),
                overwrite=overwrite,
                **desc,
            )
            docs.append(doc)

        return docs if len(docs) > 1 else docs[0]

    # ------------------------------------------------------------------
    # Legacy methods (backward compatible, now normalise output)
    # ------------------------------------------------------------------

    def campbelToParquet(self, binaryFile, fromTime=None, toTime=None,
                         chunkSize=10000):
        """Parse a Campbell binary file and return a normalised DataFrame.

        .. deprecated::
            Use :meth:`parseData` or :meth:`loadData` instead.

        Parameters
        ----------
        binaryFile : str
            Path to the Campbell binary ``.dat`` file.
        fromTime : str or None, optional
            Start time filter.
        toTime : str or None, optional
            End time filter.
        chunkSize : int, optional
            Records per batch (default 10000).

        Returns
        -------
        pandas.DataFrame
            Normalised DataFrame with lowercase columns and DatetimeIndex.
        """
        warnings.warn(
            "campbelToParquet() is deprecated. Use parseData() or loadData() instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        campelParser = Parser(chunkSize=chunkSize)
        raw_df = campelParser.parse(path=binaryFile, fromTime=fromTime, toTime=toTime)
        results = self._normalise_raw(raw_df)
        if len(results) == 1:
            return results[0][0]
        return {meta.get("deviceName", f"device_{i}"): df for i, (df, meta) in enumerate(results)}

    def asciiToParquet(self, path, fromTime=None, toTime=None):
        """Parse a TOA5 ASCII file and return normalised DataFrames.

        .. deprecated::
            Use :meth:`parseData` or :meth:`loadData` instead.

        Parameters
        ----------
        path : str
            Path to a ``.dat`` file or directory.
        fromTime : str or None, optional
            Start time filter.
        toTime : str or None, optional
            End time filter.

        Returns
        -------
        dict
            Mapping of device names to normalised DataFrames.
        """
        warnings.warn(
            "asciiToParquet() is deprecated. Use parseData() or loadData() instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        asciiParser = ASCIIParser()
        raw = asciiParser.parse(path=path, fromTime=fromTime, toTime=toTime)
        results = self._normalise_raw(raw)
        return {meta.get("deviceName", f"device_{i}"): df for i, (df, meta) in enumerate(results)}

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _parse_raw(self, path, fromTime, toTime, parser):
        """Parse raw data using the specified or auto-detected parser.

        Returns the raw (un-normalised) parser output.
        """
        if parser == "auto":
            parser = self._detect_parser(path)

        if parser == "campbell":
            p = Parser()
            return p.parse(path=path, fromTime=fromTime, toTime=toTime)
        elif parser == "toa5":
            p = ASCIIParser()
            return p.parse(path=path, fromTime=fromTime, toTime=toTime)
        else:
            raise ValueError(f"Unknown parser '{parser}'. Use 'auto', 'campbell', or 'toa5'.")

    @staticmethod
    def _detect_parser(path):
        """Auto-detect parser type from file content.

        Checks the first bytes of the file: TOB1 binary files start with
        a text header line containing ``'TOB1'``; TOA5 files start with
        ``'TOA5'`` or are CSV-like. Falls back to ``'campbell'`` for
        ``.dat`` files.
        """
        target = path
        if os.path.isdir(path):
            # Pick first .dat file in the directory
            for f in os.listdir(path):
                if f.endswith(".dat"):
                    target = os.path.join(path, f)
                    break
            else:
                raise FileNotFoundError(f"No .dat files found in {path}")

        try:
            with open(target, "rb") as fh:
                header = fh.read(512)
            # TOA5 ASCII files have 'TOA5' near the beginning
            if b"TOA5" in header[:100]:
                return "toa5"
            # TOB1 binary files have 'TOB1' near the beginning
            if b"TOB1" in header[:100]:
                return "campbell"
        except Exception:
            pass

        # Fallback: try to read as CSV (TOA5)
        try:
            with open(target, "r") as fh:
                first_line = fh.readline()
            if "," in first_line and any(kw in first_line for kw in ("Raw_Sonic", "Tct_Trh", "TIMESTAMP")):
                return "toa5"
        except Exception:
            pass

        # Default to campbell for binary .dat files
        return "campbell"

    @staticmethod
    def _normalise_raw(raw):
        """Normalise raw parser output to a list of (DataFrame, metadata).

        Handles both single-DataFrame (CampbellBinary) and dict-of-DataFrames
        (TOA5) output.
        """
        import pandas as pd

        if isinstance(raw, dict):
            # TOA5 returns dict[device_name → DataFrame]
            results = []
            for device_name, df in raw.items():
                dtype = detect_device_type(df)
                if dtype == "sonic":
                    norm_df, meta = normalise_sonic_df(df)
                elif dtype == "TRH":
                    norm_df, meta = normalise_trh_df(df)
                else:
                    # Unknown type — try sonic normalisation
                    norm_df, meta = normalise_sonic_df(df)
                meta["deviceName"] = device_name
                results.append((norm_df, meta))
            return results

        elif isinstance(raw, pd.DataFrame):
            # CampbellBinary returns a single DataFrame
            dtype = detect_device_type(raw)
            if dtype == "sonic":
                norm_df, meta = normalise_sonic_df(raw)
            elif dtype == "TRH":
                norm_df, meta = normalise_trh_df(raw)
            else:
                norm_df, meta = normalise_sonic_df(raw)
            return [(norm_df, meta)]

        else:
            raise TypeError(f"Unexpected parser output type: {type(raw)}")

docType property

str : Document type identifier for high-frequency data.

__init__(projectName, filesDirectory=None, connectionName=None)

Initialise the high-frequency meteorology toolkit.

Parameters:

Name Type Description Default
projectName str

The project name.

required
filesDirectory str

Directory for saving parquet files. If None, uses the project default.

None
connectionName str

Database connection name.

None
Source code in hera/measurements/meteorology/highfreqdata/toolkit.py
def __init__(self, projectName, filesDirectory=None, connectionName=None):
    """Initialise the high-frequency meteorology toolkit.

    Parameters
    ----------
    projectName : str
        The project name.
    filesDirectory : str, optional
        Directory for saving parquet files. If ``None``, uses the
        project default.
    connectionName : str, optional
        Database connection name.
    """
    super().__init__(
        projectName=projectName,
        toolkitName="highFreqMeteorology",
        filesDirectory=filesDirectory,
        connectionName=connectionName,
    )
    logger = get_classMethod_logger(self, "init")
    logger.info("Init High frequency data")
    self._analysis = RawdataAnalysis(self)

parseData(path, fromTime=None, toTime=None, parser='auto')

Parse a raw data file and return normalised DataFrames.

Automatically detects the file format (Campbell binary vs TOA5 ASCII) unless parser is specified explicitly. The output is normalised to the standard column format expected by the analysis layer (lowercase u, v, w, T with a DatetimeIndex).

Parameters:

Name Type Description Default
path str

Path to a raw data file or a directory of files.

required
fromTime str or None

Start time filter.

None
toTime str or None

End time filter.

None
parser str

Parser to use: 'auto' (default), 'campbell', or 'toa5'.

'auto'

Returns:

Type Description
list of tuple(pandas.DataFrame, dict)

Each element is (normalised_dataframe, metadata_dict). A single-device file produces a list with one element. Multi-device files (TOA5) produce one element per device.

Source code in hera/measurements/meteorology/highfreqdata/toolkit.py
def parseData(self, path, fromTime=None, toTime=None, parser="auto"):
    """Parse a raw data file and return normalised DataFrames.

    Automatically detects the file format (Campbell binary vs TOA5
    ASCII) unless *parser* is specified explicitly. The output is
    normalised to the standard column format expected by the analysis
    layer (lowercase ``u, v, w, T`` with a ``DatetimeIndex``).

    Parameters
    ----------
    path : str
        Path to a raw data file or a directory of files.
    fromTime : str or None, optional
        Start time filter.
    toTime : str or None, optional
        End time filter.
    parser : str, optional
        Parser to use: ``'auto'`` (default), ``'campbell'``, or
        ``'toa5'``.

    Returns
    -------
    list of tuple(pandas.DataFrame, dict)
        Each element is ``(normalised_dataframe, metadata_dict)``.
        A single-device file produces a list with one element.
        Multi-device files (TOA5) produce one element per device.
    """
    raw = self._parse_raw(path, fromTime, toTime, parser)
    return self._normalise_raw(raw)

loadData(name, path, outputDirectory, fromTime=None, toTime=None, parser='auto', version=(0, 0, 1), overwrite=False, append=False, metadata=None)

Parse a raw data file, save as parquet, and register as a data source.

This is the recommended way to ingest new high-frequency data. It parses, normalises, writes a parquet file, and registers the result as a versioned data source in the project.

If a data source with the same name already exists in the DB:

  • append=True: loads the existing parquet, concatenates the new data, and saves back to the same file.
  • overwrite=True: replaces the existing file and document.
  • Both False (default): raises ValueError.
  • Both True: raises ValueError (mutually exclusive).

If the data source does not exist, a new file is created using the project counter to generate a unique filename.

Parameters:

Name Type Description Default
name str

Data source name (e.g. 'sonic_10m'). For multi-device files, device names are appended ('sonic_10m_Raw_Sonic_1').

required
path str

Path to a raw data file or directory to parse.

required
outputDirectory str

Directory where the parquet file will be stored.

required
fromTime str or None

Start time filter.

None
toTime str or None

End time filter.

None
parser str

'auto', 'campbell', or 'toa5'.

'auto'
version tuple of int

Data source version (major, minor, patch). Default (0, 0, 1).

(0, 0, 1)
overwrite bool

If True, replace an existing data source.

False
append bool

If True, append to an existing data source's parquet file.

False
metadata dict

User-controlled metadata to store with the data source.

None

Returns:

Type Description
document or list of documents

The created/updated data source document(s).

Raises:

Type Description
ValueError

If both overwrite and append are True, or if the data source already exists and neither flag is set.

Source code in hera/measurements/meteorology/highfreqdata/toolkit.py
def loadData(self, name, path, outputDirectory, fromTime=None, toTime=None,
             parser="auto", version=(0, 0, 1), overwrite=False,
             append=False, metadata=None):
    """Parse a raw data file, save as parquet, and register as a data source.

    This is the recommended way to ingest new high-frequency data. It
    parses, normalises, writes a parquet file, and registers the result
    as a versioned data source in the project.

    If a data source with the same name already exists in the DB:

    - **append=True**: loads the existing parquet, concatenates the new
      data, and saves back to the same file.
    - **overwrite=True**: replaces the existing file and document.
    - Both ``False`` (default): raises ``ValueError``.
    - Both ``True``: raises ``ValueError`` (mutually exclusive).

    If the data source does not exist, a new file is created using the
    project counter to generate a unique filename.

    Parameters
    ----------
    name : str
        Data source name (e.g. ``'sonic_10m'``). For multi-device
        files, device names are appended (``'sonic_10m_Raw_Sonic_1'``).
    path : str
        Path to a raw data file or directory to parse.
    outputDirectory : str
        Directory where the parquet file will be stored.
    fromTime : str or None, optional
        Start time filter.
    toTime : str or None, optional
        End time filter.
    parser : str, optional
        ``'auto'``, ``'campbell'``, or ``'toa5'``.
    version : tuple of int, optional
        Data source version ``(major, minor, patch)``. Default ``(0, 0, 1)``.
    overwrite : bool, optional
        If ``True``, replace an existing data source.
    append : bool, optional
        If ``True``, append to an existing data source's parquet file.
    metadata : dict, optional
        User-controlled metadata to store with the data source.

    Returns
    -------
    document or list of documents
        The created/updated data source document(s).

    Raises
    ------
    ValueError
        If both *overwrite* and *append* are ``True``, or if the data
        source already exists and neither flag is set.
    """
    if overwrite and append:
        raise ValueError("overwrite and append are mutually exclusive.")

    results = self.parseData(path, fromTime, toTime, parser)

    os.makedirs(outputDirectory, exist_ok=True)

    docs = []
    for df, meta in results:
        # Build data source name: append device info for multi-device
        if len(results) == 1:
            ds_name = name
        else:
            suffix = meta.get("deviceName", meta.get("deviceType", ""))
            ds_name = f"{name}_{suffix}" if suffix else name

        desc = dict(metadata) if metadata else {}

        # Check if data source already exists
        existing_doc = self.getDataSourceDocument(ds_name, version=list(version))

        if existing_doc is not None:
            if append:
                # Load existing data, concatenate, save back
                import pandas as pd
                existing_df = existing_doc.getData()
                if hasattr(existing_df, "compute"):
                    existing_df = existing_df.compute()
                combined = pd.concat([existing_df, df]).sort_index()
                combined = combined[~combined.index.duplicated(keep="last")]
                output_file = existing_doc.resource
                combined.to_parquet(output_file)
                # Document already points to this file — no DB update needed
                docs.append(existing_doc)
                continue
            elif overwrite:
                # Will be handled by addDataSource(overwrite=True) below
                output_file = existing_doc.resource
            else:
                raise ValueError(
                    f"Data source '{ds_name}' (version {version}) already exists. "
                    f"Use overwrite=True or append=True."
                )
        else:
            # New data source — generate filename with counter
            file_id = self.getCounterAndAdd(f"highfreq_{ds_name}")
            output_file = os.path.join(
                outputDirectory, f"{ds_name}_{file_id}.parquet"
            )

        df.to_parquet(output_file)

        doc = self.addDataSource(
            dataSourceName=ds_name,
            resource=output_file,
            dataFormat=datatypes.PARQUET,
            version=list(version),
            overwrite=overwrite,
            **desc,
        )
        docs.append(doc)

    return docs if len(docs) > 1 else docs[0]

campbelToParquet(binaryFile, fromTime=None, toTime=None, chunkSize=10000)

Parse a Campbell binary file and return a normalised DataFrame.

.. deprecated:: Use :meth:parseData or :meth:loadData instead.

Parameters:

Name Type Description Default
binaryFile str

Path to the Campbell binary .dat file.

required
fromTime str or None

Start time filter.

None
toTime str or None

End time filter.

None
chunkSize int

Records per batch (default 10000).

10000

Returns:

Type Description
DataFrame

Normalised DataFrame with lowercase columns and DatetimeIndex.

Source code in hera/measurements/meteorology/highfreqdata/toolkit.py
def campbelToParquet(self, binaryFile, fromTime=None, toTime=None,
                     chunkSize=10000):
    """Parse a Campbell binary file and return a normalised DataFrame.

    .. deprecated::
        Use :meth:`parseData` or :meth:`loadData` instead.

    Parameters
    ----------
    binaryFile : str
        Path to the Campbell binary ``.dat`` file.
    fromTime : str or None, optional
        Start time filter.
    toTime : str or None, optional
        End time filter.
    chunkSize : int, optional
        Records per batch (default 10000).

    Returns
    -------
    pandas.DataFrame
        Normalised DataFrame with lowercase columns and DatetimeIndex.
    """
    warnings.warn(
        "campbelToParquet() is deprecated. Use parseData() or loadData() instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    campelParser = Parser(chunkSize=chunkSize)
    raw_df = campelParser.parse(path=binaryFile, fromTime=fromTime, toTime=toTime)
    results = self._normalise_raw(raw_df)
    if len(results) == 1:
        return results[0][0]
    return {meta.get("deviceName", f"device_{i}"): df for i, (df, meta) in enumerate(results)}

asciiToParquet(path, fromTime=None, toTime=None)

Parse a TOA5 ASCII file and return normalised DataFrames.

.. deprecated:: Use :meth:parseData or :meth:loadData instead.

Parameters:

Name Type Description Default
path str

Path to a .dat file or directory.

required
fromTime str or None

Start time filter.

None
toTime str or None

End time filter.

None

Returns:

Type Description
dict

Mapping of device names to normalised DataFrames.

Source code in hera/measurements/meteorology/highfreqdata/toolkit.py
def asciiToParquet(self, path, fromTime=None, toTime=None):
    """Parse a TOA5 ASCII file and return normalised DataFrames.

    .. deprecated::
        Use :meth:`parseData` or :meth:`loadData` instead.

    Parameters
    ----------
    path : str
        Path to a ``.dat`` file or directory.
    fromTime : str or None, optional
        Start time filter.
    toTime : str or None, optional
        End time filter.

    Returns
    -------
    dict
        Mapping of device names to normalised DataFrames.
    """
    warnings.warn(
        "asciiToParquet() is deprecated. Use parseData() or loadData() instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    asciiParser = ASCIIParser()
    raw = asciiParser.parse(path=path, fromTime=fromTime, toTime=toTime)
    results = self._normalise_raw(raw)
    return {meta.get("deviceName", f"device_{i}"): df for i, (df, meta) in enumerate(results)}

Experiment

experimentHome

hera.measurements.experiment.experiment.experimentHome

Bases: abstractToolkit

This object functions as a factory/home for the other experiments. It is responsible for getting the right toolkit for the requested experiment.

Source code in hera/measurements/experiment/experiment.py
class experimentHome(toolkit.abstractToolkit):
    """
    This object functions as a factory/home for the other experiments.
    It is responsible for getting the right toolkit for the requested experiment.
    """

    DOCTYPE_ENTITIES = "EntitiesData"
    CODE_DIRECTORY = "code"

    def __init__(self, projectName, filesDirectory=None,connectionName=None):
        """Initialize the experiment home factory.

        Parameters
        ----------
        projectName : str
            Name of the project.
        filesDirectory : str, optional
            Directory for cache/intermediate files.
        connectionName : str, optional
            Name of the database connection to use.
        """
        super().__init__(projectName=projectName, toolkitName="experimentToolKit", filesDirectory=filesDirectory,connectionName=connectionName)
        self.logger = logging.getLogger()
        self.logger.info("Init experiment toolkit")

    @property
    def experimentMap(self):
        """
        Backward-compatibility alias.
        Historically this was a property, so we keep the interface,
        even though today the real logic is in getExperimentsMap().
        """
        return self.getExperimentsMap()

    def getExperimentsMap(self):
        """
        Get a dictionary mapping experiment name -> datasource document.

        Returns
        -------
        dict
            Keys are experiment names (datasourceName),
            values are the matching datasource documents.
        """
        experiments_map = {}
        for experiment in self.getDataSourceMap():
            experimentName = experiment["datasourceName"]
            experiments_map[experimentName] = experiment
        return experiments_map

    @property
    def experimentsTable(self):
        """
        Return a tabular view (DataFrame-like) of experiment datasources.
        """
        return self.getDataSourceTable()

    def getExperimentsTable(self):
        """
        Backward-compatible alias for experimentsTable.
        """
        return self.getDataSourceTable()

    def getExperiment(self, experimentName, filesDirectory=None):
        """
        Get the specific experiment class.

        Parameters
        ----------
        experimentName : str
            The name of the experiment.
        filesDirectory : str, optional
            The directory to save cache/intermediate files.
            If None, uses the current working directory / 'experimentCache'.

        Returns
        -------
        experimentSetupWithData
        """
        self.logger.info(f"Getting experiment {experimentName}")
        ds_doc = self.getDataSourceDocument(datasourceName=experimentName)

        if ds_doc:
            self.logger.info("Found experiment. Loading")
            experimentPath_raw = ds_doc.getData()

            # ---------------------------------------------------------------------
            # Compatibility / robustness additions (do not change core logic):
            #
            # Problem we saw in tests:
            #   ds_doc.getData() may return a *repo-relative* path like:
            #       "hera/tests/dynamic_loading/dummy_experiment"
            #   but our current working directory is often:
            #       ".../hera/hera/tests"
            #   so os.path.abspath() can produce:
            #       ".../hera/hera/tests/hera/tests/..."  (WRONG)
            #
            # Solution:
            #   Build a list of candidate experiment paths and choose the first one
            #   that actually exists AND contains the expected "code/" directory.
            # ---------------------------------------------------------------------

            candidates = []

            # 1) Raw value as-is (might already be absolute)
            candidates.append(experimentPath_raw)

            # 2) Absolute relative to current working directory (old behavior)
            try:
                candidates.append(os.path.abspath(experimentPath_raw))
            except Exception:
                pass

            # 3) If a repo root is available, interpret raw path relative to it
            repo_root = os.environ.get("HERA_REPO_ROOT")
            if repo_root:
                candidates.append(os.path.join(repo_root, experimentPath_raw))

            # Pick the first candidate that looks valid (dir exists + has "code/" inside)
            experimentPath = None
            code_dir = None
            for cand in candidates:
                try:
                    cand_abs = os.path.abspath(cand)
                    cand_code_dir = os.path.join(cand_abs, self.CODE_DIRECTORY)

                    # We consider it valid only if the experiment directory exists
                    # and contains the expected "code" directory.
                    if os.path.isdir(cand_abs) and os.path.isdir(cand_code_dir):
                        experimentPath = cand_abs
                        code_dir = cand_code_dir
                        break
                except Exception:
                    continue

            # If none matched, fall back to the previous "best effort" behavior
            if experimentPath is None:
                # This keeps backward compatibility: we still try something even if the
                # path doesn't exist, so error messages remain informative.
                experimentPath = os.path.abspath(experimentPath_raw)
                code_dir = os.path.join(experimentPath, self.CODE_DIRECTORY)

            # Add experiment's 'code' directory to sys.path so we can import its toolkit.
            # Use insert(0, ...) to prefer this experiment's code over other paths.
            if code_dir in sys.path:
                # Move to front to avoid importing an older module from a different path.
                try:
                    sys.path.remove(code_dir)
                except ValueError:
                    pass
            sys.path.insert(0, code_dir)

            self.logger.debug(f"Experiment path (raw): {experimentPath_raw}")
            self.logger.debug(f"Experiment path (resolved): {experimentPath}")
            self.logger.debug(f"Adding path {code_dir} to sys.path (priority)")

            toolkitName = f"{experimentName}.{experimentName}"
            self.logger.debug(f"Loading toolkit: {toolkitName}")

            toolkitCls = pydoc.locate(toolkitName)
            if toolkitCls is None:
                err = f"Cannot find toolkit {toolkitName} in {code_dir}"
                self.logger.error(err)
                raise ValueError(err)

            return toolkitCls(
                projectName=self.projectName,
                pathToExperiment=experimentPath,
                filesDirectory=filesDirectory,
            )

        err = (
            f"Experiment {experimentName} not found in Project {self.projectName}. "
            f"Please load the experiment to the project."
        )
        self.logger.error(err)
        raise ValueError(err)

    def keys(self):
        """
        Get the experiment names of the project.

        Returns
        -------
        list of str
        """
        return [name for name in self.getExperimentsMap()]

    def __getitem__(self, item):
        """
        Allow experimentHome['expName'] syntax to return a specific experiment.
        """
        return self.getExperiment(item)

    def experimentDataType(self):
        """
        Backward-compatibility hook for experiment data type.
        """
        return getattr(self, "_experimentDataType", None)

experimentMap property

Backward-compatibility alias. Historically this was a property, so we keep the interface, even though today the real logic is in getExperimentsMap().

experimentsTable property

Return a tabular view (DataFrame-like) of experiment datasources.

__init__(projectName, filesDirectory=None, connectionName=None)

Initialize the experiment home factory.

Parameters:

Name Type Description Default
projectName str

Name of the project.

required
filesDirectory str

Directory for cache/intermediate files.

None
connectionName str

Name of the database connection to use.

None
Source code in hera/measurements/experiment/experiment.py
def __init__(self, projectName, filesDirectory=None,connectionName=None):
    """Initialize the experiment home factory.

    Parameters
    ----------
    projectName : str
        Name of the project.
    filesDirectory : str, optional
        Directory for cache/intermediate files.
    connectionName : str, optional
        Name of the database connection to use.
    """
    super().__init__(projectName=projectName, toolkitName="experimentToolKit", filesDirectory=filesDirectory,connectionName=connectionName)
    self.logger = logging.getLogger()
    self.logger.info("Init experiment toolkit")

getExperimentsMap()

Get a dictionary mapping experiment name -> datasource document.

Returns:

Type Description
dict

Keys are experiment names (datasourceName), values are the matching datasource documents.

Source code in hera/measurements/experiment/experiment.py
def getExperimentsMap(self):
    """
    Get a dictionary mapping experiment name -> datasource document.

    Returns
    -------
    dict
        Keys are experiment names (datasourceName),
        values are the matching datasource documents.
    """
    experiments_map = {}
    for experiment in self.getDataSourceMap():
        experimentName = experiment["datasourceName"]
        experiments_map[experimentName] = experiment
    return experiments_map

getExperimentsTable()

Backward-compatible alias for experimentsTable.

Source code in hera/measurements/experiment/experiment.py
def getExperimentsTable(self):
    """
    Backward-compatible alias for experimentsTable.
    """
    return self.getDataSourceTable()

getExperiment(experimentName, filesDirectory=None)

Get the specific experiment class.

Parameters:

Name Type Description Default
experimentName str

The name of the experiment.

required
filesDirectory str

The directory to save cache/intermediate files. If None, uses the current working directory / 'experimentCache'.

None

Returns:

Type Description
experimentSetupWithData
Source code in hera/measurements/experiment/experiment.py
def getExperiment(self, experimentName, filesDirectory=None):
    """
    Get the specific experiment class.

    Parameters
    ----------
    experimentName : str
        The name of the experiment.
    filesDirectory : str, optional
        The directory to save cache/intermediate files.
        If None, uses the current working directory / 'experimentCache'.

    Returns
    -------
    experimentSetupWithData
    """
    self.logger.info(f"Getting experiment {experimentName}")
    ds_doc = self.getDataSourceDocument(datasourceName=experimentName)

    if ds_doc:
        self.logger.info("Found experiment. Loading")
        experimentPath_raw = ds_doc.getData()

        # ---------------------------------------------------------------------
        # Compatibility / robustness additions (do not change core logic):
        #
        # Problem we saw in tests:
        #   ds_doc.getData() may return a *repo-relative* path like:
        #       "hera/tests/dynamic_loading/dummy_experiment"
        #   but our current working directory is often:
        #       ".../hera/hera/tests"
        #   so os.path.abspath() can produce:
        #       ".../hera/hera/tests/hera/tests/..."  (WRONG)
        #
        # Solution:
        #   Build a list of candidate experiment paths and choose the first one
        #   that actually exists AND contains the expected "code/" directory.
        # ---------------------------------------------------------------------

        candidates = []

        # 1) Raw value as-is (might already be absolute)
        candidates.append(experimentPath_raw)

        # 2) Absolute relative to current working directory (old behavior)
        try:
            candidates.append(os.path.abspath(experimentPath_raw))
        except Exception:
            pass

        # 3) If a repo root is available, interpret raw path relative to it
        repo_root = os.environ.get("HERA_REPO_ROOT")
        if repo_root:
            candidates.append(os.path.join(repo_root, experimentPath_raw))

        # Pick the first candidate that looks valid (dir exists + has "code/" inside)
        experimentPath = None
        code_dir = None
        for cand in candidates:
            try:
                cand_abs = os.path.abspath(cand)
                cand_code_dir = os.path.join(cand_abs, self.CODE_DIRECTORY)

                # We consider it valid only if the experiment directory exists
                # and contains the expected "code" directory.
                if os.path.isdir(cand_abs) and os.path.isdir(cand_code_dir):
                    experimentPath = cand_abs
                    code_dir = cand_code_dir
                    break
            except Exception:
                continue

        # If none matched, fall back to the previous "best effort" behavior
        if experimentPath is None:
            # This keeps backward compatibility: we still try something even if the
            # path doesn't exist, so error messages remain informative.
            experimentPath = os.path.abspath(experimentPath_raw)
            code_dir = os.path.join(experimentPath, self.CODE_DIRECTORY)

        # Add experiment's 'code' directory to sys.path so we can import its toolkit.
        # Use insert(0, ...) to prefer this experiment's code over other paths.
        if code_dir in sys.path:
            # Move to front to avoid importing an older module from a different path.
            try:
                sys.path.remove(code_dir)
            except ValueError:
                pass
        sys.path.insert(0, code_dir)

        self.logger.debug(f"Experiment path (raw): {experimentPath_raw}")
        self.logger.debug(f"Experiment path (resolved): {experimentPath}")
        self.logger.debug(f"Adding path {code_dir} to sys.path (priority)")

        toolkitName = f"{experimentName}.{experimentName}"
        self.logger.debug(f"Loading toolkit: {toolkitName}")

        toolkitCls = pydoc.locate(toolkitName)
        if toolkitCls is None:
            err = f"Cannot find toolkit {toolkitName} in {code_dir}"
            self.logger.error(err)
            raise ValueError(err)

        return toolkitCls(
            projectName=self.projectName,
            pathToExperiment=experimentPath,
            filesDirectory=filesDirectory,
        )

    err = (
        f"Experiment {experimentName} not found in Project {self.projectName}. "
        f"Please load the experiment to the project."
    )
    self.logger.error(err)
    raise ValueError(err)

keys()

Get the experiment names of the project.

Returns:

Type Description
list of str
Source code in hera/measurements/experiment/experiment.py
def keys(self):
    """
    Get the experiment names of the project.

    Returns
    -------
    list of str
    """
    return [name for name in self.getExperimentsMap()]

__getitem__(item)

Allow experimentHome['expName'] syntax to return a specific experiment.

Source code in hera/measurements/experiment/experiment.py
def __getitem__(self, item):
    """
    Allow experimentHome['expName'] syntax to return a specific experiment.
    """
    return self.getExperiment(item)

experimentDataType()

Backward-compatibility hook for experiment data type.

Source code in hera/measurements/experiment/experiment.py
def experimentDataType(self):
    """
    Backward-compatibility hook for experiment data type.
    """
    return getattr(self, "_experimentDataType", None)