Skip to content

QTMpandas module

QTMPandas

Source code in vgridpandas\qtmpandas\qtmpandas.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
@pd.api.extensions.register_dataframe_accessor("qtm")
class QTMPandas:
    def __init__(self, df: DataFrame):
        self._df = df

    # QTM API
    # These methods simply mirror the Vgrid qtm API and apply QTM functions to all rows

    def latlon2qtm(
        self,
        resolution: int,
        lat_col: str = "lat",
        lon_col: str = "lon",
        set_index: bool = True,
    ) -> AnyDataFrame:
        """Adds qtm ID to (Geo)DataFrame.

        pd.DataFrame: uses `lat_col` and `lon_col` (default `lat` and `lon`)
        gpd.GeoDataFrame: uses `geometry`

        Assumes coordinates in epsg=4326.

        Parameters
        ----------
        resolution : int
            QTM resolution
        lat_col : str
            Name of the latitude column (if used), default 'lat'
        lon_col : str
            Name of the longitude column (if used), default 'lon'
        set_index : bool
            If True, the columns with QTM ID is set as index, default 'True'

        Returns
        -------
        (Geo)DataFrame with QTM IDs added     
        """

        resolution = validate_qtm_resolution(resolution)

        if isinstance(self._df, gpd.GeoDataFrame):
            lons = self._df.geometry.x
            lats = self._df.geometry.y
        else:
            lons = self._df[lon_col]
            lats = self._df[lat_col]

        qtm_ids = [
            qtm.latlon_to_qtm_id(lat, lon, resolution) for lat, lon in zip(lats, lons)
        ]

        # qtm_column = self._format_resolution(resolution)
        qtm_column = "qtm"
        assign_arg = {qtm_column: qtm_ids, "qtm_res": resolution}
        df = self._df.assign(**assign_arg)
        if set_index:
            return df.set_index(qtm_column)
        return df

    def qtm2geo(self, qtm_column: str = None) -> GeoDataFrame:
        """Add geometry with QTM geometry to the DataFrame. Assumes QTM ID.

        Parameters
        ----------
        qtm_column : str, optional
            Name of the column containing QTM. If None, assumes qtm qtm_ids are in the index.

        Returns
        -------
        GeoDataFrame with QTM geometry

        Raises
        ------
        ValueError
            When an invalid QTM ID is encountered
        """

        if qtm_column is not None:
            # qtm qtm_ids are in the specified column
            if qtm_column not in self._df.columns:
                raise ValueError(f"Column '{qtm_column}' not found in DataFrame")
            qtm_ids = self._df[qtm_column]

            # Handle both single 1_ids and lists of 1_ids
            geometries = []
            for q_ids in qtm_ids:
                try:
                    if pd.isna(q_ids):
                        # Handle NaN values - create empty geometry
                        geometries.append(Polygon())
                    elif isinstance(q_ids, list):
                        # Handle list of 1_ids - create a MultiPolygon
                        if len(q_ids) == 0:
                            # Handle empty list - create empty geometry
                            geometries.append(Polygon())
                        else:
                            cell_geometries = [qtm_to_geo(e_id) for e_id in q_ids]
                            geometries.append(MultiPolygon(cell_geometries))
                    else:
                        # Handle single id
                        geometries.append(qtm_to_geo(q_ids))
                except (ValueError, TypeError):
                    if isinstance(q_ids, list):
                        if len(q_ids) == 0:
                            geometries.append(Polygon())
                        else:
                            cell_geometries = [qtm_to_geo(q_id) for q_id in q_ids]
                            geometries.append(MultiPolygon(cell_geometries))
                    else:
                        # Try to handle as single id
                        try:
                            geometries.append(qtm_to_geo(q_ids))
                        except Exception:
                            # If all else fails, create empty geometry
                            geometries.append(Polygon())

            result_df = self._df.copy()
            result_df['geometry'] = geometries
            return gpd.GeoDataFrame(result_df, crs="epsg:4326")

        else:
            # QTM IDs are in the index
            return self._apply_index_assign(
                wrapped_partial(qtm_to_geo),
                "geometry",
                finalizer=lambda x: gpd.GeoDataFrame(x, crs="epsg:4326"),
            )

    def polyfill(self, resolution: int, predicate: str = None, compact: bool = False, explode: bool = False) -> AnyDataFrame:
        """
        Parameters
        ----------
        resolution : int
            QTM resolution
        predicate : str, optional
            Spatial predicate to apply ('intersect', 'within', 'centroid_within', 'largest_overlap')
        compact : bool, optional
            Whether to compact the QTM IDs
        explode : bool
            If True, will explode the resulting list vertically.
            All other columns' values are copied.
            Default: False       
        """
        resolution = validate_qtm_resolution(resolution)
        def func(row):
            return list(polyfill(row.geometry, resolution, predicate, compact))

        result = self._df.apply(func, axis=1)

        if not explode:
            assign_args = {COLUMN_QTM_POLYFILL: result}
            return self._df.assign(**assign_args)

        result = result.explode().to_frame(COLUMN_QTM_POLYFILL)

        return self._df.join(result)

    def qtmbin(
        self,
        resolution: int,
        stats: str = "count",
        numeric_column: str = None,
        category_column: str = None,
        lat_col: str = "lat",
        lon_col: str = "lon",
        return_geometry: bool = True,
    ) -> DataFrame:
        """
        Bin points into QTM cells and compute statistics, optionally grouped by a category column.

        Supports both GeoDataFrame (with point geometry) and DataFrame (with lat/lon columns).

        Parameters
        ----------
        resolution : int
            QTM resolution
        stats : str
            Statistic to compute: count, sum, min, max, mean, median, std, var, range, minority, majority, variety
        numeric_column : str, optional
            Name of the numeric column to aggregate (for sum, min, max, etc.) or the value column for minority/majority/variety stats
        category_column : str, optional
            Name of the category column to group by. Required for minority, majority, and variety stats when grouping by category.
        lat_col : str, optional
            Name of the latitude column (only used for DataFrame input, ignored for GeoDataFrame)
        lon_col : str, optional
            Name of the longitude column (only used for DataFrame input, ignored for GeoDataFrame)
        return_geometry : bool
            If True, return a GeoDataFrame with qtm cell geometry
        """
        # Validate inputs and prepare data
        resolution = validate_qtm_resolution(resolution)
        # qtm_column = self._format_resolution(resolution)
        qtm_column = "qtm"
        df = self.latlon2qtm(resolution, lat_col, lon_col, False)
        # Filter to keep only QTM IDs at the requested resolution
        df = df[df[qtm_column].astype(str).str.len() == resolution]

        # Validate column existence
        if category_column is not None and category_column not in df.columns:
            raise ValueError(f"Category column '{category_column}' not found in DataFrame")
        if numeric_column is not None and numeric_column not in df.columns:
            raise ValueError(f"Numeric column '{numeric_column}' not found in DataFrame")

        # Prepare grouping columns
        group_cols = [qtm_column]
        if category_column:
            df[category_column] = df[category_column].fillna("NaN_category")
            group_cols.append(category_column)

        # Perform aggregation based on stats type
        if stats == "count":
            result = df.groupby(group_cols).size().reset_index(name=stats)

        elif stats in ["sum", "min", "max", "mean", "median", "std", "var"]:
            if not numeric_column:
                raise ValueError(f"numeric_column must be provided for stats='{stats}'")
            result = df.groupby(group_cols)[numeric_column].agg(stats).reset_index()

        elif stats == "range":
            if not numeric_column:
                raise ValueError(f"numeric_column must be provided for stats='{stats}'")
            result = df.groupby(group_cols)[numeric_column].agg(['min', 'max']).reset_index()
            result[stats] = result['max'] - result['min']
            result = result.drop(['min', 'max'], axis=1)

        elif stats in ["minority", "majority", "variety"]:
            if not numeric_column:
                raise ValueError(f"numeric_column must be provided for stats='{stats}'")

            # Define categorical aggregation function
            def cat_agg_func(x):
                values = x[numeric_column].dropna()
                freq = Counter(values)
                if not freq:
                    return None
                if stats == "minority":
                    return min(freq.items(), key=lambda y: y[1])[0]
                elif stats == "majority":
                    return max(freq.items(), key=lambda y: y[1])[0]
                elif stats == "variety":
                    return values.nunique()

            if category_column:
                # Handle categorical aggregation with category grouping
                all_categories = sorted([str(cat) for cat in df[category_column].unique()])
                result = df.groupby([qtm_column, category_column]).apply(cat_agg_func, include_groups=False).reset_index(name=stats)
                result = result.pivot(index=qtm_column, columns=category_column, values=stats)
                result = result.reindex(columns=all_categories, fill_value=0 if stats == "variety" else None)
                result = result.reset_index()
                result.columns = [qtm_column] + [f"{cat}_{stats}" for cat in all_categories]
            else:
                # Handle categorical aggregation without category grouping
                result = df.groupby([qtm_column]).apply(cat_agg_func, include_groups=False).reset_index(name=stats)
        else:
            raise ValueError(f"Unknown stats: {stats}")

        # Handle column renaming for non-categorical stats
        if len(result.columns) > len(group_cols) and not (category_column and stats in ["minority", "majority", "variety"]):
            result = result.rename(columns={result.columns[-1]: stats})

        # Handle category pivoting for non-categorical stats
        if category_column and stats not in ["minority", "majority", "variety"]:
            if len(result) == 0:
                result = pd.DataFrame(columns=[qtm_column, category_column, stats])
            else:
                try:
                    # Pivot categories to columns
                    result = result.pivot(index=qtm_column, columns=category_column, values=stats)
                    result = result.fillna(0)
                    result = result.reset_index()

                    # Rename columns with category prefixes
                    new_columns = [qtm_column]
                    for col in sorted(result.columns[1:]):
                        if col == "NaN_category":
                            new_columns.append(f"NaN_{stats}")
                        else:
                            new_columns.append(f"{col}_{stats}")
                    result.columns = new_columns
                except Exception:
                    # Fallback to simple count if pivot fails
                    result = df.groupby(qtm_column).size().reset_index(name=stats)

        # Add geometry if requested
        result = result.set_index(qtm_column)
        if return_geometry:
            result = result.qtm.qtm2geo()
        return result.reset_index()


    def _apply_index_assign(
        self,
        func: Callable,
        column_name: str,
        processor: Callable = lambda x: x,
        finalizer: Callable = lambda x: x,
    ) -> Any:
        """Helper method. Applies `func` to index and assigns the result to `column`.

        Parameters
        ----------
        func : Callable
            single-argument function to be applied to each S2 Token
        column_name : str
            name of the resulting column
        processor : Callable
            (Optional) further processes the result of func. Default: identity
        finalizer : Callable
            (Optional) further processes the resulting dataframe. Default: identity

        Returns
        -------
        Dataframe with column `column` containing the result of `func`.
        If using `finalizer`, can return anything the `finalizer` returns.
        """
        func = catch_invalid_dggs_id(func)
        result = [processor(func(qtm_id)) for qtm_id in self._df.index]
        assign_args = {column_name: result}
        return finalizer(self._df.assign(**assign_args))


    def _apply_index_explode(
        self,
        func: Callable,
        column_name: str,
        processor: Callable = lambda x: x,
        finalizer: Callable = lambda x: x,
    ) -> Any:
        """Helper method. Applies a list-making `func` to index and performs
        a vertical explode.
        Any additional values are simply copied to all the rows.

        Parameters
        ----------
        func : Callable
            single-argument function to be applied to each S2 Token
        column_name : str
            name of the resulting column
        processor : Callable
            (Optional) further processes the result of func. Default: identity
        finalizer : Callable
            (Optional) further processes the resulting dataframe. Default: identity

        Returns
        -------
        Dataframe with column `column` containing the result of `func`.
        If using `finalizer`, can return anything the `finalizer` returns.
        """
        func = catch_invalid_dggs_id(func)
        result = (
            pd.DataFrame.from_dict(
                {qtm_id: processor(func(qtm_id)) for qtm_id in self._df.index},
                orient="index",
            )
            .stack()
            .to_frame(column_name)
            .reset_index(level=1, drop=True)
        )
        result = self._df.join(result)
        return finalizer(result)

    @staticmethod
    def _format_resolution(resolution: int) -> str:
        return f"qtm_{str(resolution).zfill(2)}"

latlon2qtm(resolution, lat_col='lat', lon_col='lon', set_index=True)

Adds qtm ID to (Geo)DataFrame.

pd.DataFrame: uses lat_col and lon_col (default lat and lon) gpd.GeoDataFrame: uses geometry

Assumes coordinates in epsg=4326.

Parameters

resolution : int QTM resolution lat_col : str Name of the latitude column (if used), default 'lat' lon_col : str Name of the longitude column (if used), default 'lon' set_index : bool If True, the columns with QTM ID is set as index, default 'True'

Returns

(Geo)DataFrame with QTM IDs added

Source code in vgridpandas\qtmpandas\qtmpandas.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def latlon2qtm(
    self,
    resolution: int,
    lat_col: str = "lat",
    lon_col: str = "lon",
    set_index: bool = True,
) -> AnyDataFrame:
    """Adds qtm ID to (Geo)DataFrame.

    pd.DataFrame: uses `lat_col` and `lon_col` (default `lat` and `lon`)
    gpd.GeoDataFrame: uses `geometry`

    Assumes coordinates in epsg=4326.

    Parameters
    ----------
    resolution : int
        QTM resolution
    lat_col : str
        Name of the latitude column (if used), default 'lat'
    lon_col : str
        Name of the longitude column (if used), default 'lon'
    set_index : bool
        If True, the columns with QTM ID is set as index, default 'True'

    Returns
    -------
    (Geo)DataFrame with QTM IDs added     
    """

    resolution = validate_qtm_resolution(resolution)

    if isinstance(self._df, gpd.GeoDataFrame):
        lons = self._df.geometry.x
        lats = self._df.geometry.y
    else:
        lons = self._df[lon_col]
        lats = self._df[lat_col]

    qtm_ids = [
        qtm.latlon_to_qtm_id(lat, lon, resolution) for lat, lon in zip(lats, lons)
    ]

    # qtm_column = self._format_resolution(resolution)
    qtm_column = "qtm"
    assign_arg = {qtm_column: qtm_ids, "qtm_res": resolution}
    df = self._df.assign(**assign_arg)
    if set_index:
        return df.set_index(qtm_column)
    return df

polyfill(resolution, predicate=None, compact=False, explode=False)

Parameters

resolution : int QTM resolution predicate : str, optional Spatial predicate to apply ('intersect', 'within', 'centroid_within', 'largest_overlap') compact : bool, optional Whether to compact the QTM IDs explode : bool If True, will explode the resulting list vertically. All other columns' values are copied. Default: False

Source code in vgridpandas\qtmpandas\qtmpandas.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def polyfill(self, resolution: int, predicate: str = None, compact: bool = False, explode: bool = False) -> AnyDataFrame:
    """
    Parameters
    ----------
    resolution : int
        QTM resolution
    predicate : str, optional
        Spatial predicate to apply ('intersect', 'within', 'centroid_within', 'largest_overlap')
    compact : bool, optional
        Whether to compact the QTM IDs
    explode : bool
        If True, will explode the resulting list vertically.
        All other columns' values are copied.
        Default: False       
    """
    resolution = validate_qtm_resolution(resolution)
    def func(row):
        return list(polyfill(row.geometry, resolution, predicate, compact))

    result = self._df.apply(func, axis=1)

    if not explode:
        assign_args = {COLUMN_QTM_POLYFILL: result}
        return self._df.assign(**assign_args)

    result = result.explode().to_frame(COLUMN_QTM_POLYFILL)

    return self._df.join(result)

qtm2geo(qtm_column=None)

Add geometry with QTM geometry to the DataFrame. Assumes QTM ID.

Parameters

qtm_column : str, optional Name of the column containing QTM. If None, assumes qtm qtm_ids are in the index.

Returns

GeoDataFrame with QTM geometry

Raises

ValueError When an invalid QTM ID is encountered

Source code in vgridpandas\qtmpandas\qtmpandas.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def qtm2geo(self, qtm_column: str = None) -> GeoDataFrame:
    """Add geometry with QTM geometry to the DataFrame. Assumes QTM ID.

    Parameters
    ----------
    qtm_column : str, optional
        Name of the column containing QTM. If None, assumes qtm qtm_ids are in the index.

    Returns
    -------
    GeoDataFrame with QTM geometry

    Raises
    ------
    ValueError
        When an invalid QTM ID is encountered
    """

    if qtm_column is not None:
        # qtm qtm_ids are in the specified column
        if qtm_column not in self._df.columns:
            raise ValueError(f"Column '{qtm_column}' not found in DataFrame")
        qtm_ids = self._df[qtm_column]

        # Handle both single 1_ids and lists of 1_ids
        geometries = []
        for q_ids in qtm_ids:
            try:
                if pd.isna(q_ids):
                    # Handle NaN values - create empty geometry
                    geometries.append(Polygon())
                elif isinstance(q_ids, list):
                    # Handle list of 1_ids - create a MultiPolygon
                    if len(q_ids) == 0:
                        # Handle empty list - create empty geometry
                        geometries.append(Polygon())
                    else:
                        cell_geometries = [qtm_to_geo(e_id) for e_id in q_ids]
                        geometries.append(MultiPolygon(cell_geometries))
                else:
                    # Handle single id
                    geometries.append(qtm_to_geo(q_ids))
            except (ValueError, TypeError):
                if isinstance(q_ids, list):
                    if len(q_ids) == 0:
                        geometries.append(Polygon())
                    else:
                        cell_geometries = [qtm_to_geo(q_id) for q_id in q_ids]
                        geometries.append(MultiPolygon(cell_geometries))
                else:
                    # Try to handle as single id
                    try:
                        geometries.append(qtm_to_geo(q_ids))
                    except Exception:
                        # If all else fails, create empty geometry
                        geometries.append(Polygon())

        result_df = self._df.copy()
        result_df['geometry'] = geometries
        return gpd.GeoDataFrame(result_df, crs="epsg:4326")

    else:
        # QTM IDs are in the index
        return self._apply_index_assign(
            wrapped_partial(qtm_to_geo),
            "geometry",
            finalizer=lambda x: gpd.GeoDataFrame(x, crs="epsg:4326"),
        )

qtmbin(resolution, stats='count', numeric_column=None, category_column=None, lat_col='lat', lon_col='lon', return_geometry=True)

Bin points into QTM cells and compute statistics, optionally grouped by a category column.

Supports both GeoDataFrame (with point geometry) and DataFrame (with lat/lon columns).

Parameters

resolution : int QTM resolution stats : str Statistic to compute: count, sum, min, max, mean, median, std, var, range, minority, majority, variety numeric_column : str, optional Name of the numeric column to aggregate (for sum, min, max, etc.) or the value column for minority/majority/variety stats category_column : str, optional Name of the category column to group by. Required for minority, majority, and variety stats when grouping by category. lat_col : str, optional Name of the latitude column (only used for DataFrame input, ignored for GeoDataFrame) lon_col : str, optional Name of the longitude column (only used for DataFrame input, ignored for GeoDataFrame) return_geometry : bool If True, return a GeoDataFrame with qtm cell geometry

Source code in vgridpandas\qtmpandas\qtmpandas.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def qtmbin(
    self,
    resolution: int,
    stats: str = "count",
    numeric_column: str = None,
    category_column: str = None,
    lat_col: str = "lat",
    lon_col: str = "lon",
    return_geometry: bool = True,
) -> DataFrame:
    """
    Bin points into QTM cells and compute statistics, optionally grouped by a category column.

    Supports both GeoDataFrame (with point geometry) and DataFrame (with lat/lon columns).

    Parameters
    ----------
    resolution : int
        QTM resolution
    stats : str
        Statistic to compute: count, sum, min, max, mean, median, std, var, range, minority, majority, variety
    numeric_column : str, optional
        Name of the numeric column to aggregate (for sum, min, max, etc.) or the value column for minority/majority/variety stats
    category_column : str, optional
        Name of the category column to group by. Required for minority, majority, and variety stats when grouping by category.
    lat_col : str, optional
        Name of the latitude column (only used for DataFrame input, ignored for GeoDataFrame)
    lon_col : str, optional
        Name of the longitude column (only used for DataFrame input, ignored for GeoDataFrame)
    return_geometry : bool
        If True, return a GeoDataFrame with qtm cell geometry
    """
    # Validate inputs and prepare data
    resolution = validate_qtm_resolution(resolution)
    # qtm_column = self._format_resolution(resolution)
    qtm_column = "qtm"
    df = self.latlon2qtm(resolution, lat_col, lon_col, False)
    # Filter to keep only QTM IDs at the requested resolution
    df = df[df[qtm_column].astype(str).str.len() == resolution]

    # Validate column existence
    if category_column is not None and category_column not in df.columns:
        raise ValueError(f"Category column '{category_column}' not found in DataFrame")
    if numeric_column is not None and numeric_column not in df.columns:
        raise ValueError(f"Numeric column '{numeric_column}' not found in DataFrame")

    # Prepare grouping columns
    group_cols = [qtm_column]
    if category_column:
        df[category_column] = df[category_column].fillna("NaN_category")
        group_cols.append(category_column)

    # Perform aggregation based on stats type
    if stats == "count":
        result = df.groupby(group_cols).size().reset_index(name=stats)

    elif stats in ["sum", "min", "max", "mean", "median", "std", "var"]:
        if not numeric_column:
            raise ValueError(f"numeric_column must be provided for stats='{stats}'")
        result = df.groupby(group_cols)[numeric_column].agg(stats).reset_index()

    elif stats == "range":
        if not numeric_column:
            raise ValueError(f"numeric_column must be provided for stats='{stats}'")
        result = df.groupby(group_cols)[numeric_column].agg(['min', 'max']).reset_index()
        result[stats] = result['max'] - result['min']
        result = result.drop(['min', 'max'], axis=1)

    elif stats in ["minority", "majority", "variety"]:
        if not numeric_column:
            raise ValueError(f"numeric_column must be provided for stats='{stats}'")

        # Define categorical aggregation function
        def cat_agg_func(x):
            values = x[numeric_column].dropna()
            freq = Counter(values)
            if not freq:
                return None
            if stats == "minority":
                return min(freq.items(), key=lambda y: y[1])[0]
            elif stats == "majority":
                return max(freq.items(), key=lambda y: y[1])[0]
            elif stats == "variety":
                return values.nunique()

        if category_column:
            # Handle categorical aggregation with category grouping
            all_categories = sorted([str(cat) for cat in df[category_column].unique()])
            result = df.groupby([qtm_column, category_column]).apply(cat_agg_func, include_groups=False).reset_index(name=stats)
            result = result.pivot(index=qtm_column, columns=category_column, values=stats)
            result = result.reindex(columns=all_categories, fill_value=0 if stats == "variety" else None)
            result = result.reset_index()
            result.columns = [qtm_column] + [f"{cat}_{stats}" for cat in all_categories]
        else:
            # Handle categorical aggregation without category grouping
            result = df.groupby([qtm_column]).apply(cat_agg_func, include_groups=False).reset_index(name=stats)
    else:
        raise ValueError(f"Unknown stats: {stats}")

    # Handle column renaming for non-categorical stats
    if len(result.columns) > len(group_cols) and not (category_column and stats in ["minority", "majority", "variety"]):
        result = result.rename(columns={result.columns[-1]: stats})

    # Handle category pivoting for non-categorical stats
    if category_column and stats not in ["minority", "majority", "variety"]:
        if len(result) == 0:
            result = pd.DataFrame(columns=[qtm_column, category_column, stats])
        else:
            try:
                # Pivot categories to columns
                result = result.pivot(index=qtm_column, columns=category_column, values=stats)
                result = result.fillna(0)
                result = result.reset_index()

                # Rename columns with category prefixes
                new_columns = [qtm_column]
                for col in sorted(result.columns[1:]):
                    if col == "NaN_category":
                        new_columns.append(f"NaN_{stats}")
                    else:
                        new_columns.append(f"{col}_{stats}")
                result.columns = new_columns
            except Exception:
                # Fallback to simple count if pivot fails
                result = df.groupby(qtm_column).size().reset_index(name=stats)

    # Add geometry if requested
    result = result.set_index(qtm_column)
    if return_geometry:
        result = result.qtm.qtm2geo()
    return result.reset_index()