跳转至

Modeling_Tool.Feature

特征分析层 —— PSI、IV、相关性、分布。

PSI 群体稳定性 — PSI_Tool

PSI_Tool

PSI (Population Stability Index) Calculator Module

This module provides functions and classes for calculating Population Stability Index (PSI) to measure the distribution drift between expected and actual datasets.

Author: Matrix Agent

PSICalculator

A class for calculating Population Stability Index (PSI) with configurable parameters.

This class encapsulates common PSI calculation parameters and provides methods for various PSI calculations including single variable, grouped, and multi-variable comparisons between datasets.

参数:

名称 类型 描述 默认
buckets int

Number of bins for binning. Default is 10.

10
equal_freq bool

Whether to use equal frequency binning. Default is True.

True
min_bin_prop float

Minimum proportion for each bin. Default is 0.05.

0.05
content float

Small value to avoid division by zero. Default is 1e-6.

1e-06
precision int

Decimal precision for results. Default is 5.

5

示例:

>>> calculator = PSICalculator(buckets=10, equal_freq=True)
>>> psi = calculator.calculate(expected_df, actual_df, 'score')
源代码位于: Modeling_Tool/Feature/PSI_Tool.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
class PSICalculator:
    """
    A class for calculating Population Stability Index (PSI) with configurable parameters.

    This class encapsulates common PSI calculation parameters and provides methods
    for various PSI calculations including single variable, grouped, and multi-variable
    comparisons between datasets.

    Parameters
    ----------
    buckets : int, optional
        Number of bins for binning. Default is 10.
    equal_freq : bool, optional
        Whether to use equal frequency binning. Default is True.
    min_bin_prop : float, optional
        Minimum proportion for each bin. Default is 0.05.
    content : float, optional
        Small value to avoid division by zero. Default is 1e-6.
    precision : int, optional
        Decimal precision for results. Default is 5.

    Examples
    --------
    >>> calculator = PSICalculator(buckets=10, equal_freq=True)
    >>> psi = calculator.calculate(expected_df, actual_df, 'score')
    """

    def __init__(
        self,
        buckets: int = 10,
        equal_freq: bool = True,
        min_bin_prop: float = 0.05,
        content: float = 1e-6,
        precision: int = 5
    ):
        """
        Initialize PSICalculator with configuration parameters.

        Parameters
        ----------
        buckets : int, optional
            Number of bins for binning. Default is 10.
        equal_freq : bool, optional
            Use equal frequency binning if True. Default is True.
        min_bin_prop : float, optional
            Minimum proportion for each bin. Default is 0.05.
        content : float, optional
            Small value to prevent division by zero. Default is 1e-6.
        precision : int, optional
            Decimal precision for rounding. Default is 5.
        """
        self.buckets = buckets
        self.equal_freq = equal_freq
        self.min_bin_prop = min_bin_prop
        self.content = content
        self.precision = precision

#     def _calculate_single_psi(
#         self,
#         expected_series: pd.Series,
#         actual_series: pd.Series,
#         return_details: bool = False
#     ) -> Union[float, Tuple[float, pd.DataFrame]]:
#         """
#         Calculate PSI for a single variable.

#         This method performs binning on both expected and actual series,
#         then calculates the PSI value based on the distribution difference.

#         Parameters
#         ----------
#         expected_series : pandas.Series
#             Expected/baseline data series.
#         actual_series : pandas.Series
#             Actual/comparison data series.
#         return_details : bool, optional
#             Whether to return detailed bin information. Default is False.

#         Returns
#         -------
#         float or tuple
#             If return_details is False: Returns total PSI value.
#             If return_details is True: Returns tuple of (PSI value, details DataFrame).
#         """
#         # Drop NA
#         expected_clean = expected_series.dropna()
#         actual_clean = actual_series.dropna()

#         # Bin the expected data to get breakpoints
#         expected_bins, breakpoints = quick_binning(
#             pd.DataFrame(expected_clean), 
#             expected_clean.name, 
#             labels=None, 
#             nbins=self.buckets, 
#             precision=self.precision, 
#             equal_freq=self.equal_freq, 
#             right=True, 
#             include_lowest=False,
#             min_bin_prop=self.min_bin_prop, 
#             tree_binning=False, 
#             target=None, 
#             random_state=42
#         )

#         # Bin the actual data using the same breakpoints
#         actual_bins, _ = quick_binning(
#             pd.DataFrame(actual_clean), 
#             actual_clean.name, 
#             labels=None, 
#             nbins=list(breakpoints), 
#             precision=self.precision, 
#             equal_freq=self.equal_freq, 
#             right=True, 
#             include_lowest=False,
#             min_bin_prop=self.min_bin_prop, 
#             tree_binning=False, 
#             target=None, 
#             random_state=42
#         )

#         # Get bin proportions
#         expected_percents = expected_bins.value_counts(normalize=True, sort=False)
#         actual_percents = actual_bins.value_counts(normalize=True, sort=False)

#         # Ensure both series have the same bin indices
#         all_bins = expected_percents.index.union(actual_percents.index)
#         expected_percents = expected_percents.reindex(all_bins, fill_value=self.content)
#         actual_percents = actual_percents.reindex(all_bins, fill_value=self.content)

#         # Clip to avoid division by zero
#         expected_percents = expected_percents.clip(lower=self.content)
#         actual_percents = actual_percents.clip(lower=self.content)

#         # Calculate PSI
#         psi_values = (actual_percents - expected_percents) * np.log(actual_percents / expected_percents)
#         psi_total = psi_values.sum()

#         if return_details:
#             details = pd.DataFrame({
#                 'expected_percent': expected_percents,
#                 'actual_percent': actual_percents,
#                 'psi_component': psi_values
#             })
#             return psi_total, details
#         else:
#             return psi_total

#     def calculate_psi(
#         self,
#         expected: Union[pd.DataFrame, pd.Series],
#         actual: Union[pd.DataFrame, pd.Series],
#         target_col: str,
#         group_by: Optional[Union[str, List[str]]] = None,
#         return_details: bool = False
#     ) -> Union[float, pd.DataFrame, Tuple[Dict, Dict]]:
#         """
#         Calculate PSI for a variable, optionally by groups.

#         Parameters
#         ----------
#         expected : pandas.DataFrame or pandas.Series
#             Expected/baseline data.
#         actual : pandas.DataFrame or pandas.Series
#             Actual/comparison data.
#         target_col : str
#             Column name to calculate PSI for.
#         group_by : str or list, optional
#             Column(s) to group by. Default is None (no grouping).
#         return_details : bool, optional
#             Whether to return detailed bin information. Default is False.

#         Returns
#         -------
#         float, pandas.DataFrame, or tuple
#             PSI value(s) and optionally details.
#         """
#         if group_by is not None:
#             if isinstance(group_by, str):
#                 group_by = [group_by]

#             expected_subset = expected[[target_col] + group_by].copy()
#             actual_subset = actual[[target_col] + group_by].copy()

#             expected_subset = expected_subset.copy()
#             actual_subset = actual_subset.copy()

#             expected_subset['_dataset'] = 'expected'
#             actual_subset['_dataset'] = 'actual'

#             combined = pd.concat([expected_subset, actual_subset], ignore_index=True)

#             results = {}
#             details_dict = {}

#             for group, group_data in combined.groupby(group_by):
#                 expected_group = group_data[group_data['_dataset'] == 'expected'].drop('_dataset', axis=1)
#                 actual_group = group_data[group_data['_dataset'] == 'actual'].drop('_dataset', axis=1)

#                 if expected_group.shape[0] == 0 or actual_group.shape[0] == 0:
#                     results[group] = 999999
#                     continue

#                 if return_details:
#                     psi_value, detail = self._calculate_single_psi(
#                         expected_group[target_col], 
#                         actual_group[target_col],
#                         return_details=True
#                     )
#                     results[group] = psi_value
#                     details_dict[group] = detail
#                 else:
#                     results[group] = self._calculate_single_psi(
#                         expected_group[target_col], 
#                         actual_group[target_col]
#                     )

#             if return_details:
#                 return results, details_dict
#             else:
#                 return pd.DataFrame(results, index=['psi']).T

#         else:
#             if return_details:
#                 return self._calculate_single_psi(
#                     expected[target_col], 
#                     actual[target_col],
#                     return_details=True
#                 )
#             else:
#                 return self._calculate_single_psi(
#                     expected[target_col], 
#                     actual[target_col]
#                 )

#     def calculate_within_psi(
#         self,
#         data: pd.DataFrame,
#         grp_name: str,
#         target_col: str,
#         benchmark: Optional[Any] = None,
#         return_details: bool = False,
#         benchmark_display_name: Optional[str] = None
#     ) -> Union[pd.DataFrame, Dict]:
#         """
#         Calculate PSI values within a single dataset, comparing groups to a benchmark.

#         Parameters
#         ----------
#         data : pandas.DataFrame
#             Input dataset containing all groups.
#         grp_name : str
#             Column name for grouping.
#         target_col : str
#             Column name to calculate PSI for.
#         benchmark : str or callable, optional
#             Benchmark group value or filter function. If None, uses first group.
#         return_details : bool, optional
#             Whether to return detailed bin information. Default is False.
#         benchmark_display_name : str, optional
#             Custom name for benchmark in results.

#         Returns
#         -------
#         pandas.DataFrame or dict
#             PSI results by group, or dict with 'psi' and 'details' keys.
#         """
#         if callable(benchmark):
#             benchmark_data = data[benchmark(data)]
#         else:
#             benchmark_data = data[data[grp_name] == benchmark] if benchmark is not None else data

#         obs_values = [x for x in data[grp_name].unique().tolist() if x != benchmark]

#         res_dict = {benchmark_display_name: 0} if benchmark_display_name is not None else {}
#         detail_dict = {}

#         for obs_value in obs_values:
#             obs_data = data[data[grp_name] == obs_value]

#             if return_details:
#                 psi, details = self.calculate_psi(
#                     benchmark_data, 
#                     obs_data, 
#                     target_col=target_col, 
#                     return_details=True
#                 )
#                 res_dict[obs_value] = psi
#                 detail_dict[obs_value] = details
#             else:
#                 psi = self.calculate_psi(
#                     benchmark_data, 
#                     obs_data, 
#                     target_col=target_col
#                 )
#                 res_dict[obs_value] = psi

#         fnl_res = pd.DataFrame(res_dict, index=['psi']).T\
#             .reset_index(drop=False)\
#             .rename(columns={"index": grp_name})

#         if return_details:
#             return {"psi": fnl_res, "details": detail_dict}
#         else:
#             return fnl_res

#     def calculate_psi_within_dataset(
#         self,
#         data: pd.DataFrame,
#         grp_name: str,
#         varlist: List[str],
#         benchmark: Optional[Any] = None
#     ) -> pd.DataFrame:
#         """
#         Calculate PSI for multiple variables within a dataset.

#         Parameters
#         ----------
#         data : pandas.DataFrame
#             Input dataset.
#         grp_name : str
#             Column name for grouping.
#         varlist : list
#             List of variable names to calculate PSI for.
#         benchmark : str or callable, optional
#             Benchmark group value or filter function.

#         Returns
#         -------
#         pandas.DataFrame
#             Combined PSI results for all variables.
#         """
#         fnl_psi_res = []
#         for var in tqdm(varlist):
#             single_psi = self.calculate_within_psi(
#                 data=data, 
#                 grp_name=grp_name, 
#                 benchmark=benchmark, 
#                 target_col=var
#             ).sort_values([grp_name]).reset_index(drop=True)

#             single_psi['var'] = var
#             fnl_psi_res.append(single_psi)

#         return pd.concat(fnl_psi_res)

#     def calculate_multivar_psi_two_sets(
#         self,
#         expected_df: pd.DataFrame,
#         actual_df: pd.DataFrame,
#         varlist: List[str],
#         group_by: Optional[Union[str, List[str]]] = None
#     ) -> pd.DataFrame:
#         """
#         Calculate PSI for multiple variables by comparing two datasets.

#         Parameters
#         ----------
#         expected_df : pandas.DataFrame
#             Expected/baseline dataset.
#         actual_df : pandas.DataFrame
#             Actual/comparison dataset.
#         varlist : list
#             List of variable names to calculate PSI for.
#         group_by : str or list, optional
#             Column(s) to group by.

#         Returns
#         -------
#         pandas.DataFrame
#             PSI results for all variables.
#         """
#         multi_psi_res = []
#         for var in tqdm(varlist):
#             single_psi = self.calculate_psi(
#                 expected=expected_df, 
#                 actual=actual_df, 
#                 target_col=var, 
#                 group_by=group_by
#             )
#             if group_by is None:
#                 single_psi = pd.DataFrame([single_psi], columns=['psi'])
#             single_psi['var'] = var
#             multi_psi_res.append(single_psi)

#         return pd.concat(multi_psi_res)

    def calculate(
        self,
        expected_df: pd.DataFrame,
        current_data: pd.DataFrame,
        varlist: List[str],
        group_by: Optional[str] = None,
        group_name: Optional[str] = None,
        return_details = False
    ) -> pd.DataFrame:
        """
        Calculate grouped PSI comparing two datasets, using expected as benchmark.

        Parameters
        ----------
        expected_df : pandas.DataFrame
            Expected/baseline dataset.
        current_data : pandas.DataFrame
            Actual/comparison dataset.
        varlist : list
            List of variable names.
        group_by : str, optional
            Column to group by in both datasets.
        group_name : str, optional
            Specific group column name for multi-group calculation.

        Returns
        -------
        pandas.DataFrame
            Grouped PSI results.
        """
        return calculate_multigroup_psi_two_sets(
            expected_df = expected_df,
            actual_df = current_data,
            varlist = varlist,
            group_by = group_by,
            buckets = self.buckets,
            equal_freq = self.equal_freq,
            min_bin_prop = self.min_bin_prop,
            content = self.content,
            precision = self.precision,
            group_name = group_name,
            return_details = return_details
        )

calculate

calculate(expected_df: DataFrame, current_data: DataFrame, varlist: List[str], group_by: Optional[str] = None, group_name: Optional[str] = None, return_details=False) -> DataFrame

Calculate grouped PSI comparing two datasets, using expected as benchmark.

参数:

名称 类型 描述 默认
expected_df DataFrame

Expected/baseline dataset.

必需
current_data DataFrame

Actual/comparison dataset.

必需
varlist list

List of variable names.

必需
group_by str

Column to group by in both datasets.

None
group_name str

Specific group column name for multi-group calculation.

None

返回:

类型 描述
DataFrame

Grouped PSI results.

源代码位于: Modeling_Tool/Feature/PSI_Tool.py
def calculate(
    self,
    expected_df: pd.DataFrame,
    current_data: pd.DataFrame,
    varlist: List[str],
    group_by: Optional[str] = None,
    group_name: Optional[str] = None,
    return_details = False
) -> pd.DataFrame:
    """
    Calculate grouped PSI comparing two datasets, using expected as benchmark.

    Parameters
    ----------
    expected_df : pandas.DataFrame
        Expected/baseline dataset.
    current_data : pandas.DataFrame
        Actual/comparison dataset.
    varlist : list
        List of variable names.
    group_by : str, optional
        Column to group by in both datasets.
    group_name : str, optional
        Specific group column name for multi-group calculation.

    Returns
    -------
    pandas.DataFrame
        Grouped PSI results.
    """
    return calculate_multigroup_psi_two_sets(
        expected_df = expected_df,
        actual_df = current_data,
        varlist = varlist,
        group_by = group_by,
        buckets = self.buckets,
        equal_freq = self.equal_freq,
        min_bin_prop = self.min_bin_prop,
        content = self.content,
        precision = self.precision,
        group_name = group_name,
        return_details = return_details
    )

calculate_psi

calculate_psi(expected: Union[DataFrame, Series], actual: Union[DataFrame, Series], target_col: str, buckets: int = 10, equal_freq: bool = True, group_by: Optional[Union[str, List[str]]] = None, return_details: bool = False, min_bin_prop: float = 0.05, content: float = 1e-06, precision: int = 5) -> Union[float, DataFrame, Tuple[Dict, Dict]]

Calculate Population Stability Index (PSI) for a variable, optionally by groups.

This function computes PSI to measure the distribution shift between expected (baseline) and actual (comparison) datasets for a specified variable.

参数:

名称 类型 描述 默认
expected DataFrame or Series

Expected/baseline data.

必需
actual DataFrame or Series

Actual/comparison data.

必需
target_col str

Column name to calculate PSI for.

必需
buckets int

Number of bins. Default is 10.

10
equal_freq bool

Use equal frequency binning. Default is True.

True
group_by str or list

Column(s) to group by for stratified PSI calculation. Default is None.

None
return_details bool

Return detailed bin information. Default is False.

False
min_bin_prop float

Minimum proportion for each bin. Default is 0.05.

0.05
content float

Small value to avoid division by zero. Default is 1e-6.

1e-06
precision int

Decimal precision. Default is 5.

5

返回:

类型 描述
float, pandas.DataFrame, or tuple
  • If group_by is None and return_details is False: Single PSI float value.
  • If group_by is None and return_details is True: Tuple of (psi_value, details_dict).
  • If group_by is set and return_details is False: DataFrame with PSI values per group.
  • If group_by is set and return_details is True: Tuple of (results_dict, details_dict).

示例:

>>> # Simple PSI calculation
>>> psi = calculate_psi(expected_df, actual_df, 'score')
>>> # PSI by groups
>>> psi_by_region = calculate_psi(expected_df, actual_df, 'score', group_by='region')
源代码位于: Modeling_Tool/Feature/PSI_Tool.py
def calculate_psi(
    expected: Union[pd.DataFrame, pd.Series],
    actual: Union[pd.DataFrame, pd.Series],
    target_col: str,
    buckets: int = 10,
    equal_freq: bool = True,
    group_by: Optional[Union[str, List[str]]] = None,
    return_details: bool = False,
    min_bin_prop: float = 0.05,
    content: float = 1e-6,
    precision: int = 5
) -> Union[float, pd.DataFrame, Tuple[Dict, Dict]]:
    """
    Calculate Population Stability Index (PSI) for a variable, optionally by groups.

    This function computes PSI to measure the distribution shift between expected
    (baseline) and actual (comparison) datasets for a specified variable.

    Parameters
    ----------
    expected : pandas.DataFrame or pandas.Series
        Expected/baseline data.
    actual : pandas.DataFrame or pandas.Series
        Actual/comparison data.
    target_col : str
        Column name to calculate PSI for.
    buckets : int, optional
        Number of bins. Default is 10.
    equal_freq : bool, optional
        Use equal frequency binning. Default is True.
    group_by : str or list, optional
        Column(s) to group by for stratified PSI calculation. Default is None.
    return_details : bool, optional
        Return detailed bin information. Default is False.
    min_bin_prop : float, optional
        Minimum proportion for each bin. Default is 0.05.
    content : float, optional
        Small value to avoid division by zero. Default is 1e-6.
    precision : int, optional
        Decimal precision. Default is 5.

    Returns
    -------
    float, pandas.DataFrame, or tuple
        - If group_by is None and return_details is False: Single PSI float value.
        - If group_by is None and return_details is True: Tuple of (psi_value, details_dict).
        - If group_by is set and return_details is False: DataFrame with PSI values per group.
        - If group_by is set and return_details is True: Tuple of (results_dict, details_dict).

    Examples
    --------
    >>> # Simple PSI calculation
    >>> psi = calculate_psi(expected_df, actual_df, 'score')

    >>> # PSI by groups
    >>> psi_by_region = calculate_psi(expected_df, actual_df, 'score', group_by='region')
    """
    if group_by is not None:
        if isinstance(group_by, str):
            group_by = [group_by]

        expected_subset = expected[[target_col] + group_by].copy()
        actual_subset = actual[[target_col] + group_by].copy()

        expected_subset = expected_subset.copy()
        actual_subset = actual_subset.copy()

        expected_subset['_dataset'] = 'expected'
        actual_subset['_dataset'] = 'actual'

        combined = pd.concat([expected_subset, actual_subset], ignore_index=True)

        results = {}
        details_dict = {}

        for group, group_data in combined.groupby(group_by):
            expected_group = group_data[group_data['_dataset'] == 'expected'].drop('_dataset', axis=1)
            actual_group = group_data[group_data['_dataset'] == 'actual'].drop('_dataset', axis=1)

            if expected_group.shape[0] == 0 or actual_group.shape[0] == 0:
                results[group] = 999999
                continue

            if return_details:
                psi_value, detail = _calculate_single_psi(
                    expected_group[target_col], 
                    actual_group[target_col], 
                    buckets, 
                    equal_freq,
                    True,
                    min_bin_prop,
                    content,
                    precision
                )
                results[group] = psi_value
                details_dict[group] = detail
            else:
                results[group] = _calculate_single_psi(
                    expected_group[target_col], 
                    actual_group[target_col], 
                    buckets, 
                    equal_freq,
                    False,
                    min_bin_prop,
                    content,
                    precision
                )

        if return_details:
            return results, details_dict
        else:
            return pd.DataFrame(results, index=['psi']).T

    else:
        if return_details:
            return _calculate_single_psi(
                expected[target_col], 
                actual[target_col], 
                buckets, 
                equal_freq, 
                True, 
                min_bin_prop, 
                content, 
                precision
            )
        else:
            return _calculate_single_psi(
                expected[target_col], 
                actual[target_col], 
                buckets, 
                equal_freq, 
                False, 
                min_bin_prop, 
                content, 
                precision
            )

calculate_within_psi

calculate_within_psi(data: DataFrame, grp_name: str, target_col: str, benchmark: Optional[Any] = None, equal_freq: bool = True, buckets: int = 10, return_details: bool = False, min_bin_prop: float = 0.05, content: float = 1e-06, precision: int = 5, benchmark_display_name: Optional[str] = None) -> Union[DataFrame, Dict]

Calculate PSI values within a single dataset, comparing groups to a benchmark.

This function computes PSI between a benchmark group and all other groups in a specified column, useful for monitoring population stability over time.

参数:

名称 类型 描述 默认
data DataFrame

Input dataset containing all groups.

必需
grp_name str

Column name for grouping.

必需
target_col str

Column name to calculate PSI for.

必需
benchmark str or callable

Benchmark group value or filter function. If None, uses the first group.

None
equal_freq bool

Use equal frequency binning. Default is True.

True
buckets int

Number of bins. Default is 10.

10
return_details bool

Return detailed bin information. Default is False.

False
min_bin_prop float

Minimum proportion for each bin. Default is 0.05.

0.05
content float

Small value to avoid division by zero. Default is 1e-6.

1e-06
precision int

Decimal precision. Default is 5.

5
benchmark_display_name str

Custom name for benchmark in results.

None

返回:

类型 描述
DataFrame or dict

If return_details is False: DataFrame with PSI values per group. If return_details is True: Dict with 'psi' (DataFrame) and 'details' (dict).

示例:

>>> # Compare all months against January
>>> psi_results = calculate_within_psi(data, 'month', 'score', benchmark='2024-01')
源代码位于: Modeling_Tool/Feature/PSI_Tool.py
def calculate_within_psi(
    data: pd.DataFrame,
    grp_name: str,
    target_col: str,
    benchmark: Optional[Any] = None,
    equal_freq: bool = True,
    buckets: int = 10,
    return_details: bool = False,
    min_bin_prop: float = 0.05,
    content: float = 1e-6,
    precision: int = 5,
    benchmark_display_name: Optional[str] = None
) -> Union[pd.DataFrame, Dict]:
    """
    Calculate PSI values within a single dataset, comparing groups to a benchmark.

    This function computes PSI between a benchmark group and all other groups
    in a specified column, useful for monitoring population stability over time.

    Parameters
    ----------
    data : pandas.DataFrame
        Input dataset containing all groups.
    grp_name : str
        Column name for grouping.
    target_col : str
        Column name to calculate PSI for.
    benchmark : str or callable, optional
        Benchmark group value or filter function. If None, uses the first group.
    equal_freq : bool, optional
        Use equal frequency binning. Default is True.
    buckets : int, optional
        Number of bins. Default is 10.
    return_details : bool, optional
        Return detailed bin information. Default is False.
    min_bin_prop : float, optional
        Minimum proportion for each bin. Default is 0.05.
    content : float, optional
        Small value to avoid division by zero. Default is 1e-6.
    precision : int, optional
        Decimal precision. Default is 5.
    benchmark_display_name : str, optional
        Custom name for benchmark in results.

    Returns
    -------
    pandas.DataFrame or dict
        If return_details is False: DataFrame with PSI values per group.
        If return_details is True: Dict with 'psi' (DataFrame) and 'details' (dict).

    Examples
    --------
    >>> # Compare all months against January
    >>> psi_results = calculate_within_psi(data, 'month', 'score', benchmark='2024-01')
    """
    if callable(benchmark):
        benchmark_data = data[benchmark(data)]
    else:
        benchmark_data = data[data[grp_name] == benchmark] if benchmark is not None else data

    obs_values = [x for x in data[grp_name].unique().tolist() if x != benchmark]

    res_dict = {benchmark_display_name: 0} if benchmark_display_name is not None else {}
    detail_dict = {}

    for obs_value in obs_values:
        obs_data = data[data[grp_name] == obs_value]

        if return_details:
            psi, details = calculate_psi(
                benchmark_data, 
                obs_data, 
                target_col=target_col, 
                buckets=buckets,
                equal_freq=equal_freq, 
                return_details=True, 
                min_bin_prop=min_bin_prop, 
                content=content, 
                precision=precision
            )
            res_dict[obs_value] = psi
            detail_dict[obs_value] = details
        else:
            psi = calculate_psi(
                benchmark_data, 
                obs_data, 
                target_col=target_col, 
                buckets=buckets,
                equal_freq=equal_freq, 
                return_details=False, 
                min_bin_prop=min_bin_prop, 
                content=content, 
                precision=precision
            )
            res_dict[obs_value] = psi

    fnl_res = pd.DataFrame(res_dict, index=['psi']).T\
        .reset_index(drop=False)\
        .rename(columns={"index": grp_name})

    if return_details:
        return {"psi": fnl_res, "details": detail_dict}
    else:
        return fnl_res

calculate_psi_within_dataset

calculate_psi_within_dataset(data: DataFrame, grp_name: str, varlist: List[str], benchmark: Optional[Any] = None, equal_freq: bool = True, buckets: int = 10, min_bin_prop: float = 0.05, content: float = 1e-06, precision: int = 5) -> DataFrame

Calculate PSI for multiple variables within a dataset, comparing groups to a benchmark.

This function iterates over a list of variables and calculates PSI for each, combining results into a single DataFrame.

参数:

名称 类型 描述 默认
data DataFrame

Input dataset.

必需
grp_name str

Column name for grouping.

必需
varlist list

List of variable names to calculate PSI for.

必需
benchmark str or callable

Benchmark group value or filter function.

None
equal_freq bool

Use equal frequency binning. Default is True.

True
buckets int

Number of bins. Default is 10.

10
min_bin_prop float

Minimum proportion for each bin. Default is 0.05.

0.05
content float

Small value to avoid division by zero. Default is 1e-6.

1e-06
precision int

Decimal precision. Default is 5.

5

返回:

类型 描述
DataFrame

Combined PSI results for all variables, sorted by group.

示例:

>>> variables = ['score', 'age', 'income']
>>> psi_df = calculate_psi_within_dataset(data, 'month', variables, benchmark='2024-01')
源代码位于: Modeling_Tool/Feature/PSI_Tool.py
def calculate_psi_within_dataset(
    data: pd.DataFrame,
    grp_name: str,
    varlist: List[str],
    benchmark: Optional[Any] = None,
    equal_freq: bool = True,
    buckets: int = 10,
    min_bin_prop: float = 0.05,
    content: float = 1e-6,
    precision: int = 5
) -> pd.DataFrame:
    """
    Calculate PSI for multiple variables within a dataset, comparing groups to a benchmark.

    This function iterates over a list of variables and calculates PSI for each,
    combining results into a single DataFrame.

    Parameters
    ----------
    data : pandas.DataFrame
        Input dataset.
    grp_name : str
        Column name for grouping.
    varlist : list
        List of variable names to calculate PSI for.
    benchmark : str or callable, optional
        Benchmark group value or filter function.
    equal_freq : bool, optional
        Use equal frequency binning. Default is True.
    buckets : int, optional
        Number of bins. Default is 10.
    min_bin_prop : float, optional
        Minimum proportion for each bin. Default is 0.05.
    content : float, optional
        Small value to avoid division by zero. Default is 1e-6.
    precision : int, optional
        Decimal precision. Default is 5.

    Returns
    -------
    pandas.DataFrame
        Combined PSI results for all variables, sorted by group.

    Examples
    --------
    >>> variables = ['score', 'age', 'income']
    >>> psi_df = calculate_psi_within_dataset(data, 'month', variables, benchmark='2024-01')
    """
    fnl_psi_res = []
    for var in tqdm(varlist):
        single_psi = calculate_within_psi(
            data=data, 
            grp_name=grp_name, 
            benchmark=benchmark, 
            target_col=var, 
            buckets=buckets, 
            equal_freq=equal_freq, 
            return_details=False, 
            min_bin_prop=min_bin_prop, 
            content=content, 
            precision=precision
        ).sort_values([grp_name]).reset_index(drop=True)

        single_psi['var'] = var
        fnl_psi_res.append(single_psi)

    return pd.concat(fnl_psi_res)

calculate_multivar_psi_two_sets

calculate_multivar_psi_two_sets(expected_df: DataFrame, actual_df: DataFrame, varlist: List[str], group_by: Optional[Union[str, List[str]]] = None, buckets: int = 10, equal_freq: bool = True, min_bin_prop: float = 0.05, content: float = 1e-06, precision: int = 5) -> DataFrame

Calculate PSI for multiple variables by comparing two different datasets.

This function computes PSI for each variable in varlist between expected and actual DataFrames.

参数:

名称 类型 描述 默认
expected_df DataFrame

Expected/baseline dataset.

必需
actual_df DataFrame

Actual/comparison dataset.

必需
varlist list

List of variable names to calculate PSI for.

必需
group_by str or list

Column(s) to group by. Default is None.

None
buckets int

Number of bins. Default is 10.

10
equal_freq bool

Use equal frequency binning. Default is True.

True
min_bin_prop float

Minimum proportion for each bin. Default is 0.05.

0.05
content float

Small value to avoid division by zero. Default is 1e-6.

1e-06
precision int

Decimal precision. Default is 5.

5

返回:

类型 描述
DataFrame

PSI results for all variables with 'var' and 'psi' columns.

示例:

>>> variables = ['score', 'age', 'income']
>>> psi_df = calculate_multivar_psi_two_sets(train_df, production_df, variables)
源代码位于: Modeling_Tool/Feature/PSI_Tool.py
def calculate_multivar_psi_two_sets(
    expected_df: pd.DataFrame,
    actual_df: pd.DataFrame,
    varlist: List[str],
    group_by: Optional[Union[str, List[str]]] = None,
    buckets: int = 10,
    equal_freq: bool = True,
    min_bin_prop: float = 0.05,
    content: float = 1e-6,
    precision: int = 5
) -> pd.DataFrame:
    """
    Calculate PSI for multiple variables by comparing two different datasets.

    This function computes PSI for each variable in varlist between expected
    and actual DataFrames.

    Parameters
    ----------
    expected_df : pandas.DataFrame
        Expected/baseline dataset.
    actual_df : pandas.DataFrame
        Actual/comparison dataset.
    varlist : list
        List of variable names to calculate PSI for.
    group_by : str or list, optional
        Column(s) to group by. Default is None.
    buckets : int, optional
        Number of bins. Default is 10.
    equal_freq : bool, optional
        Use equal frequency binning. Default is True.
    min_bin_prop : float, optional
        Minimum proportion for each bin. Default is 0.05.
    content : float, optional
        Small value to avoid division by zero. Default is 1e-6.
    precision : int, optional
        Decimal precision. Default is 5.

    Returns
    -------
    pandas.DataFrame
        PSI results for all variables with 'var' and 'psi' columns.

    Examples
    --------
    >>> variables = ['score', 'age', 'income']
    >>> psi_df = calculate_multivar_psi_two_sets(train_df, production_df, variables)
    """
    multi_psi_res = []
    for var in tqdm(varlist):
        single_psi = calculate_psi(
            expected=expected_df, 
            actual=actual_df, 
            target_col=var, 
            group_by=group_by, 
            buckets=buckets,
            return_details=False
        )
        if group_by is None:
            single_psi = pd.DataFrame([single_psi], columns=['psi'])
        single_psi['var'] = var
        multi_psi_res.append(single_psi)

    return pd.concat(multi_psi_res)

calculate_multigroup_psi_two_sets

calculate_multigroup_psi_two_sets(expected_df: DataFrame, actual_df: DataFrame, varlist: List[str], group_by: Optional[Union[str, List[str]]] = None, buckets: int = 10, equal_freq: bool = True, min_bin_prop: float = 0.05, content: float = 1e-06, precision: int = 5, group_name: Optional[str] = None, return_details: bool = False) -> Union[DataFrame, Dict[str, DataFrame]]

Calculate grouped PSI using expected DataFrame as benchmark, applied to actual DataFrame groups.

参数:

名称 类型 描述 默认
expected_df DataFrame

基准/期望分布数据集。

必需
actual_df DataFrame

实际/对比分布数据集。

必需
varlist list

待计算 PSI 的变量名列表。

必需
group_by str or list

分组列名,对每个分组分别计算 PSI。默认 None。

None
buckets int

分箱数,默认 10。

10
equal_freq bool

是否等频分箱,默认 True。

True
min_bin_prop float

每箱最小占比,默认 0.05。

0.05
content float

防除零小量,默认 1e-6。

1e-06
precision int

数值精度,默认 5。

5
group_name str

多分组计算时的分组列名。默认 None。

None
return_details bool

是否返回详细分箱信息。若为 True,返回字典 {'psi': psi_df, 'details': details_df}, details_df 包含列:['bin', 'expected_percent', 'actual_percent', 'psi_component', group_name, 'var']

False
源代码位于: Modeling_Tool/Feature/PSI_Tool.py
def calculate_multigroup_psi_two_sets(
    expected_df: pd.DataFrame,
    actual_df: pd.DataFrame,
    varlist: List[str],
    group_by: Optional[Union[str, List[str]]] = None,
    buckets: int = 10,
    equal_freq: bool = True,
    min_bin_prop: float = 0.05,
    content: float = 1e-6,
    precision: int = 5,
    group_name: Optional[str] = None,
    return_details: bool = False
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
    """
    Calculate grouped PSI using expected DataFrame as benchmark, applied to actual DataFrame groups.

    Parameters
    ----------
    expected_df : pandas.DataFrame
        基准/期望分布数据集。
    actual_df : pandas.DataFrame
        实际/对比分布数据集。
    varlist : list
        待计算 PSI 的变量名列表。
    group_by : str or list, optional
        分组列名,对每个分组分别计算 PSI。默认 None。
    buckets : int, optional
        分箱数,默认 10。
    equal_freq : bool, optional
        是否等频分箱,默认 True。
    min_bin_prop : float, optional
        每箱最小占比,默认 0.05。
    content : float, optional
        防除零小量,默认 1e-6。
    precision : int, optional
        数值精度,默认 5。
    group_name : str, optional
        多分组计算时的分组列名。默认 None。
    return_details : bool, optional
        是否返回详细分箱信息。若为 True,返回字典 {'psi': psi_df, 'details': details_df},
        details_df 包含列:['bin', 'expected_percent', 'actual_percent', 'psi_component', group_name, 'var']
    """
    if group_name is not None:
        if actual_df[group_name].isna().sum() > 0:
            actual_df = actual_df.copy()
            actual_df[group_name] = actual_df[group_name].fillna('__NULL__')

        if return_details:
            psi_records = []
            detail_records = []

            for group, group_data in actual_df.groupby(group_name):
                for var in tqdm(varlist, desc=f"Processing group {group}"):
                    psi_val, detail_df = calculate_psi(
                        expected=expected_df,
                        actual=group_data,
                        target_col=var,
                        group_by=group_by,
                        buckets=buckets,
                        equal_freq=equal_freq,
                        return_details=True,
                        min_bin_prop=min_bin_prop,
                        content=content,
                        precision=precision
                    )

                    psi_records.append({group_name: group, 'var': var, 'psi': psi_val})

                    # ========== 标准化 detail_df(修复后的核心代码) ==========
                    detail_df = _coerce_psi_detail_frame(detail_df, group_by=group_by)
                    if detail_df is not None and not detail_df.empty:
                        if 'bin' not in detail_df.columns:
                            # 重置索引,原索引列可能名为 'index' 或其他
                            detail_df = detail_df.reset_index()
                            # reset_index 后第一列就是原来的索引列
                            index_col = detail_df.columns[0]
                            detail_df = detail_df.rename(columns={index_col: 'bin'})

                        detail_df[group_name] = group
                        detail_df['var'] = var

                        required_cols = ['bin', 'expected_count', 'actual_count', 'expected_percent', 'actual_percent', 'psi_component', group_name, 'var']
                        for col in required_cols:
                            if col not in detail_df.columns:
                                detail_df[col] = np.nan
                        detail_df = detail_df[required_cols]
                        detail_records.append(detail_df)
                    # ======================================================

            psi_df = pd.DataFrame(psi_records)
            details_df = pd.concat(detail_records, ignore_index=True) if detail_records else pd.DataFrame(columns=['bin', 'expected_count', 'actual_count', 'expected_percent', 'actual_percent', 'psi_component', group_name, 'var'])
            return {'psi': psi_df, 'details': details_df}

    else:
        # 未指定 group_name 时
        if return_details:
            # 支持多个变量
            psi_records = []
            detail_records = []
            for var in tqdm(varlist, desc="Calculating PSI with details"):
                psi_val, detail_df = calculate_psi(
                    expected=expected_df,
                    actual=actual_df,
                    target_col=var,
                    group_by=group_by,
                    buckets=buckets,
                    equal_freq=equal_freq,
                    return_details=True,
                    min_bin_prop=min_bin_prop,
                    content=content,
                    precision=precision
                )
                psi_records.append({'var': var, 'psi': psi_val})

                # 标准化 detail_df
                detail_df = _coerce_psi_detail_frame(detail_df, group_by=group_by)
                if detail_df is not None and not detail_df.empty:
                    if 'bin' not in detail_df.columns:
                        detail_df = detail_df.reset_index()
                        index_col = detail_df.columns[0]
                        detail_df = detail_df.rename(columns={index_col: 'bin'})
                    detail_df['var'] = var
                    required_cols = ['bin', 'expected_count', 'actual_count', 'expected_percent', 'actual_percent', 'psi_component', 'var']
                    for col in required_cols:
                        if col not in detail_df.columns:
                            detail_df[col] = np.nan
                    detail_df = detail_df[required_cols]
                    detail_records.append(detail_df)

            psi_df = pd.DataFrame(psi_records)
            details_df = pd.concat(detail_records, ignore_index=True) if detail_records else pd.DataFrame(columns=['bin', 'expected_count', 'actual_count', 'expected_percent', 'actual_percent', 'psi_component', 'var'])
            return {'psi': psi_df, 'details': details_df}

        else:

            # 不返回详情,使用原有的批量计算函数
            group_psi = calculate_multivar_psi_two_sets(
                expected_df=expected_df, 
                actual_df=actual_df, 
                varlist=varlist, 
                group_by=None, 
                buckets=buckets, 
                equal_freq=equal_freq, 
                min_bin_prop=min_bin_prop, 
                content=content, 
                precision=precision
            )

            return group_psi

变量洞察与相关性 — Feature_Insights

Feature_Insights

变量提取与相关性分析工具包 提供变量分析、IV计算、WOE绑图和相关性过滤功能

VarExtractionInsights

变量提取与洞察分析器。 用于对数据集进行变量分析,计算IV值、WOE分箱, 并支持可视化绑图和变量筛选。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
dep str

目标变量(因变量)列名

必需
plot_path str

绑图保存路径

必需
nbins int

分箱数量,默认为10

10
equal_freq bool

是否使用等频分箱,默认为True

True
min_bin_prop float

每个分箱的最小样本比例,默认为0.05

0.05
precision int

WOE和IV计算精度,默认为5

5
chi2_method bool

是否使用卡方分箱方法,默认为False

False
chi2_p float

卡方检验的p值阈值,默认为0.9

0.9
init_equi_bins int

初始等频分箱数量,默认为5000

5000
tree_binning bool

是否使用决策树分箱,默认为True

True
include_missing bool

是否将缺失值作为单独分箱,默认为True

True
seed int

随机种子,默认为3407

3407
missing_rate_ref int / float

缺失值填充参考值,默认为-999999

-999999
spec_values list

特殊值列表,默认为空列表

None

示例:

>>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
>>> report = analyzer.get_var_analysis_report(df, ['var1', 'var2'])
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
class VarExtractionInsights:
    """变量提取与洞察分析器。
    用于对数据集进行变量分析,计算IV值、WOE分箱,
    并支持可视化绑图和变量筛选。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    dep : str
        目标变量(因变量)列名
    plot_path : str
        绑图保存路径
    nbins : int, optional
        分箱数量,默认为10
    equal_freq : bool, optional
        是否使用等频分箱,默认为True
    min_bin_prop : float, optional
        每个分箱的最小样本比例,默认为0.05
    precision : int, optional
        WOE和IV计算精度,默认为5
    chi2_method : bool, optional
        是否使用卡方分箱方法,默认为False
    chi2_p : float, optional
        卡方检验的p值阈值,默认为0.9
    init_equi_bins : int, optional
        初始等频分箱数量,默认为5000
    tree_binning : bool, optional
        是否使用决策树分箱,默认为True
    include_missing : bool, optional
        是否将缺失值作为单独分箱,默认为True
    seed : int, optional
        随机种子,默认为3407
    missing_rate_ref : int/float, optional
        缺失值填充参考值,默认为-999999
    spec_values : list, optional
        特殊值列表,默认为空列表

    Examples
    --------
    >>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
    >>> report = analyzer.get_var_analysis_report(df, ['var1', 'var2'])
    """

    def __init__(self, data, dep, plot_path,
                 nbins=10, equal_freq=True, min_bin_prop=0.05, precision=5, chi2_method=False, chi2_p=0.9,
                 init_equi_bins=5000, tree_binning=True, include_missing=True, seed=3407, missing_rate_ref=-999999, spec_values=None):
        """初始化变量提取与洞察分析器。

        Parameters
        ----------
        data : pd.DataFrame
            输入的原始数据框
        dep : str
            目标变量(因变量)列名
        plot_path : str
            绑图保存路径
        nbins : int, optional
            分箱数量
        equal_freq : bool, optional
            是否使用等频分箱
        min_bin_prop : float, optional
            每个分箱的最小样本比例
        precision : int, optional
            WOE和IV计算精度
        chi2_method : bool, optional
            是否使用卡方分箱方法
        chi2_p : float, optional
            卡方检验的p值阈值
        init_equi_bins : int, optional
            初始等频分箱数量
        tree_binning : bool, optional
            是否使用决策树分箱
        include_missing : bool, optional
            是否将缺失值作为单独分箱
        seed : int, optional
            随机种子
        missing_rate_ref : int/float, optional
            缺失值填充参考值
        spec_values : list, optional
            特殊值列表
        """
        self.data = data
        self.dep = dep
        self.plot_path = plot_path

        self.nbins = nbins
        self.equal_freq = equal_freq
        self.min_bin_prop = min_bin_prop
        self.precision = precision
        self.chi2_method = chi2_method
        self.chi2_p = chi2_p
        self.init_equi_bins = init_equi_bins
        self.tree_binning = tree_binning
        self.include_missing = include_missing
        self.seed = seed
        self.missing_rate_ref = missing_rate_ref
        self.spec_values = spec_values if spec_values is not None else []

    @staticmethod
    def remove_folder(file_path):
        """删除指定文件夹。

        递归删除指定路径的文件夹及其所有内容,
        如果文件夹不存在则静默处理。

        Parameters
        ----------
        file_path : str
            要删除的文件夹路径

        Examples
        --------
        >>> VarExtractionInsights.remove_folder('/path/to/folder')
        """
        import shutil
        try:
            shutil.rmtree(file_path)
        except Exception:
            pass

    def get_var_analysis_report(self, data, varlist, dep=None, iv_cut=0.01):
        """生成变量分析报告。

        对指定变量列表计算IV值、KS值、Lift值等指标,
        并返回满足IV阈值的变量分析汇总结果。

        Parameters
        ----------
        data : pd.DataFrame
            输入的原始数据框
        varlist : list
            待分析的变量名列表
        dep : str, optional
            目标变量列名,默认为None(使用初始化时的dep)
        iv_cut : float, optional
            IV值筛选阈值,默认为0.01

        Returns
        -------
        pd.DataFrame
            包含变量分析结果的汇总表,包括:
            - var: 变量名
            - n_all: 总样本数
            - n: 非缺失样本数
            - ks_in_gains: KS统计量
            - lift_in_gains: Lift值
            - iv: IV值
            - n_bump: 分箱数量
            - missing_rate: 缺失率
            - min, mean, max: 统计量
            - n_bins: 分箱数

        Examples
        --------
        >>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
        >>> report = analyzer.get_var_analysis_report(df, ['var1', 'var2'])
        """
        if dep is None:
            dep = self.dep

        from Modeling_Tool.Eval.Model_Eval_Tool import get_gains_table

        iv_info_res = []
        for var in tqdm(varlist):
            if data[var].nunique() > 1:
                try:
                    attr_iv = get_gains_table(
                        data=data,
                        dep=self.dep,
                        nbins=self.nbins,
                        precision=self.precision,
                        min_bin_prop=self.min_bin_prop,
                        include_missing=self.include_missing,
                        score=var,
                        equal_freq=self.equal_freq,
                        chi2_method=self.chi2_method,
                        chi2_p=self.chi2_p,
                        init_equi_bins=self.init_equi_bins,
                        fillna=self.missing_rate_ref,
                        spec_values=self.spec_values,
                        retSummary=True,
                        tree_binning=self.tree_binning,
                        random_state=self.seed,
                        ascending=True,
                    )

                    attr_iv['var'] = var
                    iv_info_res.append(attr_iv)

                except TypeError:
                    continue

        iv_info_res = pd.concat(iv_info_res).sort_values("IV", ascending=False)

        high_iv_summary = iv_info_res.query(f"IV >= {iv_cut}").round(4)
        high_iv_varlist = high_iv_summary['var'].tolist()

        means = proc_means_by_grp(data, high_iv_varlist, spec_missing_value=self.missing_rate_ref)

        if len(high_iv_varlist) == 0:
            logger.info(f"WARNING: No variable with IV >= {iv_cut}")
            means = means.rename(columns={"index": "attribute"})

        fnl_summary = high_iv_summary.merge(
            means[['attribute', 'N_ALL', 'N', 'MISSING_RATE', 'MIN', 'MEAN', 'MAX']],
            left_on=['var'],
            right_on=['attribute'],
            how='left'
        )
        fnl_summary.columns = [x.lower() for x in fnl_summary.columns]
        fnl_summary = fnl_summary[[
            'var', 'n_all', 'n', 'ks_in_gains', 'lift_in_gains', 'iv',
            'n_bump', 'missing_rate', 'min', 'mean', 'max', 'n_bins'
        ]]

        return fnl_summary

    def plot_woe(self, data, varlist, plot_group=None, plot_dirname="var_analysis_plot", plot_path=None):
        """绑制WOE分布图。

        对指定变量列表计算WOE值并绑制分布图,
        保存到指定目录。

        Parameters
        ----------
        data : pd.DataFrame
            输入的原始数据框
        varlist : list
            待绑图的变量名列表
        plot_group : str, optional
            分组变量名,默认为None
        plot_dirname : str, optional
            绑图保存子目录名,默认为"var_analysis_plot"
        plot_path : str, optional
            绑图保存根路径,默认为None(使用初始化时的plot_path)

        Returns
        -------
        None

        Examples
        --------
        >>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
        >>> analyzer.plot_woe(df, ['var1', 'var2'])
        """

        if plot_path is None:
            plot_path = self.plot_path

        from Modeling_Tool.WOE.WOE_Master import WOE_Master

        # Fill Missing Value.
        drv_fillna = data.copy()
        drv_fillna[varlist] = drv_fillna[varlist].fillna(self.missing_rate_ref)

        woe_master = WOE_Master(
            train_data=drv_fillna,
            varlist=varlist,
            dep=self.dep,
            graph_save_dir=plot_path
        )

        woe_master.fit(
            nbins=self.nbins,
            equal_freq=self.equal_freq,
            min_bin_prop=self.min_bin_prop,
            precision=self.precision,
            chi2_config=(self.init_equi_bins, self.chi2_p) if self.chi2_method else None,
            tree_binning_seed=self.seed if self.tree_binning else None,
            include_missing=self.include_missing,
            spec_values=self.spec_values
        )

        train_woe = woe_master.transform(drv_fillna)
        woe_master.plot_bivar_graph(train_woe, group=plot_group, dirname=plot_dirname)

remove_folder staticmethod

remove_folder(file_path)

删除指定文件夹。

递归删除指定路径的文件夹及其所有内容, 如果文件夹不存在则静默处理。

参数:

名称 类型 描述 默认
file_path str

要删除的文件夹路径

必需

示例:

>>> VarExtractionInsights.remove_folder('/path/to/folder')
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
@staticmethod
def remove_folder(file_path):
    """删除指定文件夹。

    递归删除指定路径的文件夹及其所有内容,
    如果文件夹不存在则静默处理。

    Parameters
    ----------
    file_path : str
        要删除的文件夹路径

    Examples
    --------
    >>> VarExtractionInsights.remove_folder('/path/to/folder')
    """
    import shutil
    try:
        shutil.rmtree(file_path)
    except Exception:
        pass

get_var_analysis_report

get_var_analysis_report(data, varlist, dep=None, iv_cut=0.01)

生成变量分析报告。

对指定变量列表计算IV值、KS值、Lift值等指标, 并返回满足IV阈值的变量分析汇总结果。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
varlist list

待分析的变量名列表

必需
dep str

目标变量列名,默认为None(使用初始化时的dep)

None
iv_cut float

IV值筛选阈值,默认为0.01

0.01

返回:

类型 描述
DataFrame

包含变量分析结果的汇总表,包括: - var: 变量名 - n_all: 总样本数 - n: 非缺失样本数 - ks_in_gains: KS统计量 - lift_in_gains: Lift值 - iv: IV值 - n_bump: 分箱数量 - missing_rate: 缺失率 - min, mean, max: 统计量 - n_bins: 分箱数

示例:

>>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
>>> report = analyzer.get_var_analysis_report(df, ['var1', 'var2'])
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
def get_var_analysis_report(self, data, varlist, dep=None, iv_cut=0.01):
    """生成变量分析报告。

    对指定变量列表计算IV值、KS值、Lift值等指标,
    并返回满足IV阈值的变量分析汇总结果。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    varlist : list
        待分析的变量名列表
    dep : str, optional
        目标变量列名,默认为None(使用初始化时的dep)
    iv_cut : float, optional
        IV值筛选阈值,默认为0.01

    Returns
    -------
    pd.DataFrame
        包含变量分析结果的汇总表,包括:
        - var: 变量名
        - n_all: 总样本数
        - n: 非缺失样本数
        - ks_in_gains: KS统计量
        - lift_in_gains: Lift值
        - iv: IV值
        - n_bump: 分箱数量
        - missing_rate: 缺失率
        - min, mean, max: 统计量
        - n_bins: 分箱数

    Examples
    --------
    >>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
    >>> report = analyzer.get_var_analysis_report(df, ['var1', 'var2'])
    """
    if dep is None:
        dep = self.dep

    from Modeling_Tool.Eval.Model_Eval_Tool import get_gains_table

    iv_info_res = []
    for var in tqdm(varlist):
        if data[var].nunique() > 1:
            try:
                attr_iv = get_gains_table(
                    data=data,
                    dep=self.dep,
                    nbins=self.nbins,
                    precision=self.precision,
                    min_bin_prop=self.min_bin_prop,
                    include_missing=self.include_missing,
                    score=var,
                    equal_freq=self.equal_freq,
                    chi2_method=self.chi2_method,
                    chi2_p=self.chi2_p,
                    init_equi_bins=self.init_equi_bins,
                    fillna=self.missing_rate_ref,
                    spec_values=self.spec_values,
                    retSummary=True,
                    tree_binning=self.tree_binning,
                    random_state=self.seed,
                    ascending=True,
                )

                attr_iv['var'] = var
                iv_info_res.append(attr_iv)

            except TypeError:
                continue

    iv_info_res = pd.concat(iv_info_res).sort_values("IV", ascending=False)

    high_iv_summary = iv_info_res.query(f"IV >= {iv_cut}").round(4)
    high_iv_varlist = high_iv_summary['var'].tolist()

    means = proc_means_by_grp(data, high_iv_varlist, spec_missing_value=self.missing_rate_ref)

    if len(high_iv_varlist) == 0:
        logger.info(f"WARNING: No variable with IV >= {iv_cut}")
        means = means.rename(columns={"index": "attribute"})

    fnl_summary = high_iv_summary.merge(
        means[['attribute', 'N_ALL', 'N', 'MISSING_RATE', 'MIN', 'MEAN', 'MAX']],
        left_on=['var'],
        right_on=['attribute'],
        how='left'
    )
    fnl_summary.columns = [x.lower() for x in fnl_summary.columns]
    fnl_summary = fnl_summary[[
        'var', 'n_all', 'n', 'ks_in_gains', 'lift_in_gains', 'iv',
        'n_bump', 'missing_rate', 'min', 'mean', 'max', 'n_bins'
    ]]

    return fnl_summary

plot_woe

plot_woe(data, varlist, plot_group=None, plot_dirname='var_analysis_plot', plot_path=None)

绑制WOE分布图。

对指定变量列表计算WOE值并绑制分布图, 保存到指定目录。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
varlist list

待绑图的变量名列表

必需
plot_group str

分组变量名,默认为None

None
plot_dirname str

绑图保存子目录名,默认为"var_analysis_plot"

'var_analysis_plot'
plot_path str

绑图保存根路径,默认为None(使用初始化时的plot_path)

None

返回:

类型 描述
None

示例:

>>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
>>> analyzer.plot_woe(df, ['var1', 'var2'])
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
def plot_woe(self, data, varlist, plot_group=None, plot_dirname="var_analysis_plot", plot_path=None):
    """绑制WOE分布图。

    对指定变量列表计算WOE值并绑制分布图,
    保存到指定目录。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    varlist : list
        待绑图的变量名列表
    plot_group : str, optional
        分组变量名,默认为None
    plot_dirname : str, optional
        绑图保存子目录名,默认为"var_analysis_plot"
    plot_path : str, optional
        绑图保存根路径,默认为None(使用初始化时的plot_path)

    Returns
    -------
    None

    Examples
    --------
    >>> analyzer = VarExtractionInsights(df, 'target', '/path/to/plots')
    >>> analyzer.plot_woe(df, ['var1', 'var2'])
    """

    if plot_path is None:
        plot_path = self.plot_path

    from Modeling_Tool.WOE.WOE_Master import WOE_Master

    # Fill Missing Value.
    drv_fillna = data.copy()
    drv_fillna[varlist] = drv_fillna[varlist].fillna(self.missing_rate_ref)

    woe_master = WOE_Master(
        train_data=drv_fillna,
        varlist=varlist,
        dep=self.dep,
        graph_save_dir=plot_path
    )

    woe_master.fit(
        nbins=self.nbins,
        equal_freq=self.equal_freq,
        min_bin_prop=self.min_bin_prop,
        precision=self.precision,
        chi2_config=(self.init_equi_bins, self.chi2_p) if self.chi2_method else None,
        tree_binning_seed=self.seed if self.tree_binning else None,
        include_missing=self.include_missing,
        spec_values=self.spec_values
    )

    train_woe = woe_master.transform(drv_fillna)
    woe_master.plot_bivar_graph(train_woe, group=plot_group, dirname=plot_dirname)

CorrelationFilter

相关性过滤分析器。

提供基于相关性分析的高相关变量筛选和去除功能, 支持IV值对比和迭代筛选。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
dep str

目标变量(因变量)列名

必需
corr_cutpoint float

相关系数阈值,超过该值的变量对将被筛选,默认为0.8

0.8
method str

相关系数计算方法,可选'pearson'、'spearman'、'kendall',默认为'pearson'

'pearson'

示例:

>>> filter_analyzer = CorrelationFilter(df, 'target')
>>> keep_vars = filter_analyzer.remove_highly_correlated(['var1', 'var2'])
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
class CorrelationFilter:
    """相关性过滤分析器。

    提供基于相关性分析的高相关变量筛选和去除功能,
    支持IV值对比和迭代筛选。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    dep : str
        目标变量(因变量)列名
    corr_cutpoint : float, optional
        相关系数阈值,超过该值的变量对将被筛选,默认为0.8
    method : str, optional
        相关系数计算方法,可选'pearson'、'spearman'、'kendall',默认为'pearson'

    Examples
    --------
    >>> filter_analyzer = CorrelationFilter(df, 'target')
    >>> keep_vars = filter_analyzer.remove_highly_correlated(['var1', 'var2'])
    """

    def __init__(self, data, dep, corr_cutpoint=0.8, method='pearson', tree_binning=False, chi2_method=False, seed = 42, chi2_p =0.999, init_equi_bins = 1000, 
                 missing_rate_ref = -9999999, spec_values = [], base_metric = 'iv'):
        """初始化相关性过滤分析器。

        Parameters
        ----------
        data : pd.DataFrame
            输入的原始数据框
        dep : str
            目标变量(因变量)列名
        corr_cutpoint : float, optional
            相关系数阈值
        method : str, optional
            相关系数计算方法
        tree_binning_seed : int, optional
            决策树分箱随机种子
        chi2_config : tuple, optional
            卡方分箱配置
        """
        self.data = data
        self.dep = dep
        self.corr_cutpoint = corr_cutpoint
        self.method = method
        self.tree_binning = tree_binning
        self.chi2_method = chi2_method
        self.seed = seed
        self.chi2_p = chi2_p
        self.init_equi_bins = init_equi_bins
        self.missing_rate_ref = missing_rate_ref
        self.spec_values = spec_values
        self.base_metric = base_metric

        self.correlated_dict = {}
        self.filtered_varlist = []

    def filter_single_iteration(self, varlist):
        """单次迭代过滤高相关变量。

        对变量列表执行一次相关性过滤,保留IV值最高的变量。

        Parameters
        ----------
        varlist : list
            待筛选的变量名列表

        Returns
        -------
        list
            筛选后保留的变量名列表
        """
        base_metric = self.base_metric.lower()

        name_mapping = {
            "iv": "iv",
            "ks": "ks_in_gains"
        }

        high_corr_var = var_corr_filter(
            self.data, varlist,
            corr_cutpoint=self.corr_cutpoint,
            method=self.method
        )

        if len(high_corr_var) == 0:
            return varlist

        base_varlist = high_corr_var['VAR1'].drop_duplicates().tolist()

        correlated_dict = self.correlated_dict
        selected_varlist = []
        removed_varlist = []
        for var in tqdm(base_varlist):
            if var not in set(removed_varlist + selected_varlist):                
                single_var_corr = high_corr_var.query(f""" VAR1 == '{var}'""")                
                correlated_list = [var] + single_var_corr['VAR2'].drop_duplicates().tolist()

                varInsights = VarExtractionInsights(data = self.data,
                                                    dep = self.dep, 
                                                    plot_path = None, 
                                                    nbins = 10, 
                                                    equal_freq = True, 
                                                    min_bin_prop = 0.05, 
                                                    precision = 5, 
                                                    chi2_method = self.chi2_method, 
                                                    chi2_p = self.chi2_p, 
                                                    init_equi_bins = self.init_equi_bins, 
                                                    tree_binning = self.tree_binning, 
                                                    include_missing = True, 
                                                    seed = self.seed, 
                                                    missing_rate_ref = self.missing_rate_ref)

                fnl_summary = varInsights.get_var_analysis_report(data = self.data, varlist = correlated_list, dep = self.dep, iv_cut = 0)
                fnl_selected_var = fnl_summary.sort_values([name_mapping[base_metric]], ascending = False)['var'][0]

                if fnl_selected_var not in selected_varlist:
                    selected_varlist.append(fnl_selected_var)

                removed_varlist += [x for x in correlated_list if x != fnl_selected_var and x not in removed_varlist]

                if var not in correlated_dict:
                    correlated_dict[var] = {}
                    correlated_dict[var]['corr'] = single_var_corr
                    correlated_dict[var]['gains'] = fnl_summary
                else:
                    correlated_dict[var]['corr'] = pd.concat([correlated_dict[var]['corr'], single_var_corr]).drop_duplicates()
                    correlated_dict[var]['gains'] = pd.concat([correlated_dict[var]['gains'], fnl_summary]).drop_duplicates()

        other_varlist = [x for x in varlist if x not in (selected_varlist + removed_varlist)]
        fnl_keep_varlist = selected_varlist + other_varlist

        self.correlated_dict = correlated_dict

        return fnl_keep_varlist

    def remove_highly_correlated(self, varlist, max_iterations=10):
        """迭代去除高相关变量。

        反复执行相关性过滤,直到没有变量被移除或达到最大迭代次数。

        Parameters
        ----------
        varlist : list
            待筛选的变量名列表
        max_iterations : int, optional
            最大迭代次数,默认为10

        Returns
        -------
        list
            最终保留的变量名列表

        Examples
        --------
        >>> filter_analyzer = CorrelationFilter(df, 'target')
        >>> keep_vars = filter_analyzer.remove_highly_correlated(['var1', 'var2', 'var3'])
        """
        last_keep_list = self.filter_single_iteration(varlist)

        for i in range(1, max_iterations):
            fnl_keep_list = self.filter_single_iteration(last_keep_list)

            removed_vars = [x for x in last_keep_list if x not in fnl_keep_list]
            self.filtered_varlist.append(removed_vars)
            if len(removed_vars) == 0:
                break

            last_keep_list = fnl_keep_list

        self.filtered_varlist = [x for x in varlist if x not in last_keep_list]
        return last_keep_list


    @staticmethod
    def calculate_vif(df):
        """计算方差膨胀因子(VIF)。

        用于检测多重共线性问题,返回各变量的VIF值。
        VIF值越大表示共线性越严重,通常VIF > 10表示存在严重共线性。

        Parameters
        ----------
        df : pd.DataFrame
            包含自变量的数据框

        Returns
        -------
        pd.DataFrame
            包含以下列的数据框:
            - index: 变量名
            - VIF: 方差膨胀因子值

        Examples
        --------
        >>> vif_result = calculate_vif(X_train)
        >>> high_vif_vars = vif_result[vif_result['VIF'] > 10]['index'].tolist()
        """
        from statsmodels.stats.outliers_influence import variance_inflation_factor

        vif = pd.DataFrame()
        vif['index'] = df.columns
        vif['VIF'] = [variance_inflation_factor(df.values, i) for i in range(df.shape[1])]
        return vif

filter_single_iteration

filter_single_iteration(varlist)

单次迭代过滤高相关变量。

对变量列表执行一次相关性过滤,保留IV值最高的变量。

参数:

名称 类型 描述 默认
varlist list

待筛选的变量名列表

必需

返回:

类型 描述
list

筛选后保留的变量名列表

源代码位于: Modeling_Tool/Feature/Feature_Insights.py
def filter_single_iteration(self, varlist):
    """单次迭代过滤高相关变量。

    对变量列表执行一次相关性过滤,保留IV值最高的变量。

    Parameters
    ----------
    varlist : list
        待筛选的变量名列表

    Returns
    -------
    list
        筛选后保留的变量名列表
    """
    base_metric = self.base_metric.lower()

    name_mapping = {
        "iv": "iv",
        "ks": "ks_in_gains"
    }

    high_corr_var = var_corr_filter(
        self.data, varlist,
        corr_cutpoint=self.corr_cutpoint,
        method=self.method
    )

    if len(high_corr_var) == 0:
        return varlist

    base_varlist = high_corr_var['VAR1'].drop_duplicates().tolist()

    correlated_dict = self.correlated_dict
    selected_varlist = []
    removed_varlist = []
    for var in tqdm(base_varlist):
        if var not in set(removed_varlist + selected_varlist):                
            single_var_corr = high_corr_var.query(f""" VAR1 == '{var}'""")                
            correlated_list = [var] + single_var_corr['VAR2'].drop_duplicates().tolist()

            varInsights = VarExtractionInsights(data = self.data,
                                                dep = self.dep, 
                                                plot_path = None, 
                                                nbins = 10, 
                                                equal_freq = True, 
                                                min_bin_prop = 0.05, 
                                                precision = 5, 
                                                chi2_method = self.chi2_method, 
                                                chi2_p = self.chi2_p, 
                                                init_equi_bins = self.init_equi_bins, 
                                                tree_binning = self.tree_binning, 
                                                include_missing = True, 
                                                seed = self.seed, 
                                                missing_rate_ref = self.missing_rate_ref)

            fnl_summary = varInsights.get_var_analysis_report(data = self.data, varlist = correlated_list, dep = self.dep, iv_cut = 0)
            fnl_selected_var = fnl_summary.sort_values([name_mapping[base_metric]], ascending = False)['var'][0]

            if fnl_selected_var not in selected_varlist:
                selected_varlist.append(fnl_selected_var)

            removed_varlist += [x for x in correlated_list if x != fnl_selected_var and x not in removed_varlist]

            if var not in correlated_dict:
                correlated_dict[var] = {}
                correlated_dict[var]['corr'] = single_var_corr
                correlated_dict[var]['gains'] = fnl_summary
            else:
                correlated_dict[var]['corr'] = pd.concat([correlated_dict[var]['corr'], single_var_corr]).drop_duplicates()
                correlated_dict[var]['gains'] = pd.concat([correlated_dict[var]['gains'], fnl_summary]).drop_duplicates()

    other_varlist = [x for x in varlist if x not in (selected_varlist + removed_varlist)]
    fnl_keep_varlist = selected_varlist + other_varlist

    self.correlated_dict = correlated_dict

    return fnl_keep_varlist

remove_highly_correlated

remove_highly_correlated(varlist, max_iterations=10)

迭代去除高相关变量。

反复执行相关性过滤,直到没有变量被移除或达到最大迭代次数。

参数:

名称 类型 描述 默认
varlist list

待筛选的变量名列表

必需
max_iterations int

最大迭代次数,默认为10

10

返回:

类型 描述
list

最终保留的变量名列表

示例:

>>> filter_analyzer = CorrelationFilter(df, 'target')
>>> keep_vars = filter_analyzer.remove_highly_correlated(['var1', 'var2', 'var3'])
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
def remove_highly_correlated(self, varlist, max_iterations=10):
    """迭代去除高相关变量。

    反复执行相关性过滤,直到没有变量被移除或达到最大迭代次数。

    Parameters
    ----------
    varlist : list
        待筛选的变量名列表
    max_iterations : int, optional
        最大迭代次数,默认为10

    Returns
    -------
    list
        最终保留的变量名列表

    Examples
    --------
    >>> filter_analyzer = CorrelationFilter(df, 'target')
    >>> keep_vars = filter_analyzer.remove_highly_correlated(['var1', 'var2', 'var3'])
    """
    last_keep_list = self.filter_single_iteration(varlist)

    for i in range(1, max_iterations):
        fnl_keep_list = self.filter_single_iteration(last_keep_list)

        removed_vars = [x for x in last_keep_list if x not in fnl_keep_list]
        self.filtered_varlist.append(removed_vars)
        if len(removed_vars) == 0:
            break

        last_keep_list = fnl_keep_list

    self.filtered_varlist = [x for x in varlist if x not in last_keep_list]
    return last_keep_list

calculate_vif staticmethod

calculate_vif(df)

计算方差膨胀因子(VIF)。

用于检测多重共线性问题,返回各变量的VIF值。 VIF值越大表示共线性越严重,通常VIF > 10表示存在严重共线性。

参数:

名称 类型 描述 默认
df DataFrame

包含自变量的数据框

必需

返回:

类型 描述
DataFrame

包含以下列的数据框: - index: 变量名 - VIF: 方差膨胀因子值

示例:

>>> vif_result = calculate_vif(X_train)
>>> high_vif_vars = vif_result[vif_result['VIF'] > 10]['index'].tolist()
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
@staticmethod
def calculate_vif(df):
    """计算方差膨胀因子(VIF)。

    用于检测多重共线性问题,返回各变量的VIF值。
    VIF值越大表示共线性越严重,通常VIF > 10表示存在严重共线性。

    Parameters
    ----------
    df : pd.DataFrame
        包含自变量的数据框

    Returns
    -------
    pd.DataFrame
        包含以下列的数据框:
        - index: 变量名
        - VIF: 方差膨胀因子值

    Examples
    --------
    >>> vif_result = calculate_vif(X_train)
    >>> high_vif_vars = vif_result[vif_result['VIF'] > 10]['index'].tolist()
    """
    from statsmodels.stats.outliers_influence import variance_inflation_factor

    vif = pd.DataFrame()
    vif['index'] = df.columns
    vif['VIF'] = [variance_inflation_factor(df.values, i) for i in range(df.shape[1])]
    return vif

var_corr_filter

var_corr_filter(data, varlist, corr_cutpoint=0.8, method='pearson')

筛选高相关变量对。

计算变量间的相关系数,返回超过阈值的高相关变量对列表。

参数:

名称 类型 描述 默认
data DataFrame

输入的数据框

必需
varlist list

待筛选的变量名列表

必需
corr_cutpoint float

相关系数阈值,默认为0.8

0.8
method str

相关系数计算方法,可选'pearson'、'spearman'、'kendall', 默认为'pearson'

'pearson'

返回:

类型 描述
DataFrame

包含高相关变量对的数据框,包括: - VAR1: 变量1 - VAR2: 变量2 - CORR: 相关系数

示例:

>>> high_corr = var_corr_filter(df, ['var1', 'var2', 'var3'], corr_cutpoint=0.8)
源代码位于: Modeling_Tool/Feature/Feature_Insights.py
def var_corr_filter(data, varlist, corr_cutpoint=0.8, method='pearson'):
    """筛选高相关变量对。

    计算变量间的相关系数,返回超过阈值的高相关变量对列表。

    Parameters
    ----------
    data : pd.DataFrame
        输入的数据框
    varlist : list
        待筛选的变量名列表
    corr_cutpoint : float, optional
        相关系数阈值,默认为0.8
    method : str, optional
        相关系数计算方法,可选'pearson'、'spearman'、'kendall',
        默认为'pearson'

    Returns
    -------
    pd.DataFrame
        包含高相关变量对的数据框,包括:
        - VAR1: 变量1
        - VAR2: 变量2
        - CORR: 相关系数

    Examples
    --------
    >>> high_corr = var_corr_filter(df, ['var1', 'var2', 'var3'], corr_cutpoint=0.8)
    """
    import numpy as np

    corr_matrix = data[varlist].corr(method=method)

    corr_list = []
    for i in range(len(varlist)):
        for j in range(i + 1, len(varlist)):
            var1, var2 = varlist[i], varlist[j]
            corr_value = corr_matrix.iloc[i, j]
            if abs(corr_value) > corr_cutpoint:
                corr_list.append({
                    'VAR1': var1,
                    'VAR2': var2,
                    'CORR': corr_value
                })

    return pd.DataFrame(corr_list)

分布分析 — Distribution_Tool

Distribution_Tool

数据处理与分析工具包 提供分组统计、分布分析和可视化功能

proc_means

Proc Means by Group.

用于按分组变量计算数值变量的描述性统计量,包括均值、分位数、缺失率等。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
varlist list

需要进行统计的数值变量名列表

必需
groupby list

分组变量名列表

必需
spec_missing_value any

需要被当作缺失值处理的特殊值,默认为None

None

示例:

>>> pm = proc_means(df, ['age', 'score'], ['gender'])
>>> result = pm()
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
class proc_means:
    """ Proc Means by Group.

    用于按分组变量计算数值变量的描述性统计量,包括均值、分位数、缺失率等。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    varlist : list
        需要进行统计的数值变量名列表
    groupby : list
        分组变量名列表
    spec_missing_value : any, optional
        需要被当作缺失值处理的特殊值,默认为None

    Examples
    --------
    >>> pm = proc_means(df, ['age', 'score'], ['gender'])
    >>> result = pm()
    """

    def __init__(self, data, varlist, groupby, spec_missing_value=None):
        """初始化proc_means对象。

        Parameters
        ----------
        data : pd.DataFrame
            输入的原始数据框
        varlist : list
            需要进行统计的数值变量名列表
        groupby : list
            分组变量名列表
        spec_missing_value : any, optional
            需要被当作缺失值处理的特殊值
        """
        self.data = data
        self.varlist = varlist
        self.groupby = groupby
        self.spec_missing_value = spec_missing_value

    def treat_spec_missing(self):
        """处理特定的缺失值。

        将self.spec_missing_value指定的值替换为np.nan,以便正确计算统计量。

        Returns
        -------
        pd.DataFrame
            处理缺失值后的数据框
        """
        if self.spec_missing_value is not None:
            self.data = self.data.replace(self.spec_missing_value, np.nan)
        return self.data

    def group_means(self, q=None):
        """按分组计算描述性统计量。

        对指定变量按分组计算描述性统计量,包括计数、均值、标准差、最小值、最大值
        以及自定义分位数。

        Parameters
        ----------
        q : list, optional
            分位数列表,默认为[0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

        Returns
        -------
        pd.DataFrame
            包含描述性统计量的数据框
        """
        if q is None:
            q = [0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

        data_w_varlist = self.data[self.groupby + self.varlist]

        grouped_data = data_w_varlist\
            .melt(id_vars=self.groupby,
                  value_vars=self.varlist,
                  var_name="attribute",
                  value_name="value")\
            .groupby(self.groupby + ["attribute"])
        res_describe = grouped_data.describe(percentiles=q)
        res_describe = res_describe.droplevel(level=0, axis=1)
        return res_describe

    def group_sum(self):
        """计算每组的样本数量。

        统计每个分组组合中的观测数量(样本总数)。

        Returns
        -------
        pd.DataFrame
            包含每组样本数量的聚合结果
        """
        data = self.data.copy()
        data["_sum_ind"] = 1
        data = data[self.groupby + self.varlist + ["_sum_ind"]]
        grouped_data = data\
            .melt(id_vars=self.groupby + ["_sum_ind"],
                  value_vars=self.varlist,
                  var_name="attribute",
                  value_name="value")\
            .groupby(self.groupby + ["attribute"])
        res_sum = grouped_data.agg(sum_all=("_sum_ind", "sum"))
        return res_sum

    def __call__(self, q=None):
        """执行完整的分组统计分析。

        综合计算分组统计量,包括样本数、N_ALL、均值、标准差、分位数和缺失率。

        Parameters
        ----------
        q : list, optional
            分位数列表,默认为[0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

        Returns
        -------
        pd.DataFrame
            完整的分组统计报告,包含N、N_ALL、各分位数和缺失率
        """
        if q is None:
            q = [0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

        self.data = self.treat_spec_missing()
        sum_total = self.group_sum()
        means = self.group_means(q=q)
        res_fnl = sum_total.merge(means, left_index=True, right_index=True)
        res_fnl["missing_rate"] = 1 - res_fnl["count"] / res_fnl["sum_all"]
        quantile_rename = {str(int(x * 100)) + "%": "Q" + str(int(x * 100)) for x in q}
        res_fnl = res_fnl.rename(columns=quantile_rename)
        res_fnl = res_fnl.rename(columns={"count": "N", "sum_all": "N_ALL"})
        res_fnl.columns = [x.upper() for x in res_fnl.columns]
        return res_fnl

treat_spec_missing

treat_spec_missing()

处理特定的缺失值。

将self.spec_missing_value指定的值替换为np.nan,以便正确计算统计量。

返回:

类型 描述
DataFrame

处理缺失值后的数据框

源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def treat_spec_missing(self):
    """处理特定的缺失值。

    将self.spec_missing_value指定的值替换为np.nan,以便正确计算统计量。

    Returns
    -------
    pd.DataFrame
        处理缺失值后的数据框
    """
    if self.spec_missing_value is not None:
        self.data = self.data.replace(self.spec_missing_value, np.nan)
    return self.data

group_means

group_means(q=None)

按分组计算描述性统计量。

对指定变量按分组计算描述性统计量,包括计数、均值、标准差、最小值、最大值 以及自定义分位数。

参数:

名称 类型 描述 默认
q list

分位数列表,默认为[0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

None

返回:

类型 描述
DataFrame

包含描述性统计量的数据框

源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def group_means(self, q=None):
    """按分组计算描述性统计量。

    对指定变量按分组计算描述性统计量,包括计数、均值、标准差、最小值、最大值
    以及自定义分位数。

    Parameters
    ----------
    q : list, optional
        分位数列表,默认为[0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

    Returns
    -------
    pd.DataFrame
        包含描述性统计量的数据框
    """
    if q is None:
        q = [0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

    data_w_varlist = self.data[self.groupby + self.varlist]

    grouped_data = data_w_varlist\
        .melt(id_vars=self.groupby,
              value_vars=self.varlist,
              var_name="attribute",
              value_name="value")\
        .groupby(self.groupby + ["attribute"])
    res_describe = grouped_data.describe(percentiles=q)
    res_describe = res_describe.droplevel(level=0, axis=1)
    return res_describe

group_sum

group_sum()

计算每组的样本数量。

统计每个分组组合中的观测数量(样本总数)。

返回:

类型 描述
DataFrame

包含每组样本数量的聚合结果

源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def group_sum(self):
    """计算每组的样本数量。

    统计每个分组组合中的观测数量(样本总数)。

    Returns
    -------
    pd.DataFrame
        包含每组样本数量的聚合结果
    """
    data = self.data.copy()
    data["_sum_ind"] = 1
    data = data[self.groupby + self.varlist + ["_sum_ind"]]
    grouped_data = data\
        .melt(id_vars=self.groupby + ["_sum_ind"],
              value_vars=self.varlist,
              var_name="attribute",
              value_name="value")\
        .groupby(self.groupby + ["attribute"])
    res_sum = grouped_data.agg(sum_all=("_sum_ind", "sum"))
    return res_sum

DistributionShiftAnalyzer

分布偏移分析器。

用于分析不同分组之间变量分布的偏移情况,通过比较各分组超过基准组 异常值阈值的观测比例来评估分布差异。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
grp_name str

分组变量名

必需
benchmark_value any

基准组的分组值,用于确定异常值阈值

必需

示例:

>>> analyzer = DistributionShiftAnalyzer(df, 'gender', 'Male')
>>> result = analyzer.analyze(['age', 'score'])
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
class DistributionShiftAnalyzer:
    """分布偏移分析器。

    用于分析不同分组之间变量分布的偏移情况,通过比较各分组超过基准组
    异常值阈值的观测比例来评估分布差异。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    grp_name : str
        分组变量名
    benchmark_value : any
        基准组的分组值,用于确定异常值阈值

    Examples
    --------
    >>> analyzer = DistributionShiftAnalyzer(df, 'gender', 'Male')
    >>> result = analyzer.analyze(['age', 'score'])
    """

    def __init__(self, data, grp_name, benchmark_value):
        """初始化分布偏移分析器。

        Parameters
        ----------
        data : pd.DataFrame
            输入的原始数据框
        grp_name : str
            分组变量名
        benchmark_value : any
            基准组的分组值
        """
        self.data = data
        self.grp_name = grp_name
        self.benchmark_value = benchmark_value

    def analyze_single_var(self, var, outlier_value=0.99):
        """分析单个变量的分布偏移。

        计算各分组中超过基准组指定分位数阈值的观测比例。

        Parameters
        ----------
        var : str
            待分析的变量名
        outlier_value : float, optional
            用于确定异常值阈值的分位数,默认为0.99

        Returns
        -------
        dict
            键为分组值,值为超过阈值的观测比例
        """
        means_rpt = proc_means_by_grp(
            self.data, [var], [self.grp_name],
            spec_missing_value=None, q=[outlier_value]
        )

        outlier_name = f'Q{str(int(outlier_value * 100))}'
        outlier_threshold = means_rpt[
            means_rpt[self.grp_name] == self.benchmark_value
        ][outlier_name].iloc[0]

        res_dict = {}
        for group, group_data in self.data.groupby(self.grp_name):
            cnt = group_data[group_data[var] > outlier_threshold].shape[0]
            prop = round(cnt / group_data.shape[0], 4)
            res_dict[group] = prop
        return res_dict

    def analyze(self, varlist, outlier_value=0.99):
        """分析多个变量的分布偏移。

        对变量列表中每个变量计算各分组超过基准组阈值的比例,
        并以数据框形式返回所有结果。

        Parameters
        ----------
        varlist : list
            待分析的变量名列表
        outlier_value : float, optional
            用于确定异常值阈值的分位数,默认为0.99

        Returns
        -------
        pd.DataFrame
            行索引为变量名,列为各分组值,内容为超过阈值的观测比例

        Examples
        --------
        >>> analyzer = DistributionShiftAnalyzer(df, 'gender', 'Male')
        >>> result = analyzer.analyze(['age', 'score'])
        """
        res_dict = {}
        for var in varlist:
            res = self.analyze_single_var(var=var, outlier_value=outlier_value)
            res_dict[var] = res
        return pd.DataFrame(res_dict).T

analyze_single_var

analyze_single_var(var, outlier_value=0.99)

分析单个变量的分布偏移。

计算各分组中超过基准组指定分位数阈值的观测比例。

参数:

名称 类型 描述 默认
var str

待分析的变量名

必需
outlier_value float

用于确定异常值阈值的分位数,默认为0.99

0.99

返回:

类型 描述
dict

键为分组值,值为超过阈值的观测比例

源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def analyze_single_var(self, var, outlier_value=0.99):
    """分析单个变量的分布偏移。

    计算各分组中超过基准组指定分位数阈值的观测比例。

    Parameters
    ----------
    var : str
        待分析的变量名
    outlier_value : float, optional
        用于确定异常值阈值的分位数,默认为0.99

    Returns
    -------
    dict
        键为分组值,值为超过阈值的观测比例
    """
    means_rpt = proc_means_by_grp(
        self.data, [var], [self.grp_name],
        spec_missing_value=None, q=[outlier_value]
    )

    outlier_name = f'Q{str(int(outlier_value * 100))}'
    outlier_threshold = means_rpt[
        means_rpt[self.grp_name] == self.benchmark_value
    ][outlier_name].iloc[0]

    res_dict = {}
    for group, group_data in self.data.groupby(self.grp_name):
        cnt = group_data[group_data[var] > outlier_threshold].shape[0]
        prop = round(cnt / group_data.shape[0], 4)
        res_dict[group] = prop
    return res_dict

analyze

analyze(varlist, outlier_value=0.99)

分析多个变量的分布偏移。

对变量列表中每个变量计算各分组超过基准组阈值的比例, 并以数据框形式返回所有结果。

参数:

名称 类型 描述 默认
varlist list

待分析的变量名列表

必需
outlier_value float

用于确定异常值阈值的分位数,默认为0.99

0.99

返回:

类型 描述
DataFrame

行索引为变量名,列为各分组值,内容为超过阈值的观测比例

示例:

>>> analyzer = DistributionShiftAnalyzer(df, 'gender', 'Male')
>>> result = analyzer.analyze(['age', 'score'])
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def analyze(self, varlist, outlier_value=0.99):
    """分析多个变量的分布偏移。

    对变量列表中每个变量计算各分组超过基准组阈值的比例,
    并以数据框形式返回所有结果。

    Parameters
    ----------
    varlist : list
        待分析的变量名列表
    outlier_value : float, optional
        用于确定异常值阈值的分位数,默认为0.99

    Returns
    -------
    pd.DataFrame
        行索引为变量名,列为各分组值,内容为超过阈值的观测比例

    Examples
    --------
    >>> analyzer = DistributionShiftAnalyzer(df, 'gender', 'Male')
    >>> result = analyzer.analyze(['age', 'score'])
    """
    res_dict = {}
    for var in varlist:
        res = self.analyze_single_var(var=var, outlier_value=outlier_value)
        res_dict[var] = res
    return pd.DataFrame(res_dict).T

DistributionPlotter

分布图绘制器。

提供多种方式可视化数值变量的分布情况,支持核密度图、直方图和地毯图。

参数:

名称 类型 描述 默认
data DataFrame

输入的数据框

必需
score str

用于绑制分布的变量名

必需

示例:

>>> plotter = DistributionPlotter(df, 'age')
>>> plotter.plot(method='kdeplot', title='Age Distribution')
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
class DistributionPlotter:
    """分布图绘制器。

    提供多种方式可视化数值变量的分布情况,支持核密度图、直方图和地毯图。

    Parameters
    ----------
    data : pd.DataFrame
        输入的数据框
    score : str
        用于绑制分布的变量名

    Examples
    --------
    >>> plotter = DistributionPlotter(df, 'age')
    >>> plotter.plot(method='kdeplot', title='Age Distribution')
    """

    def __init__(self, data, score):
        """初始化分布图绘制器。

        Parameters
        ----------
        data : pd.DataFrame
            输入的数据框
        score : str
            用于绑制分布的变量名
        """
        self.data = data
        self.score = score
        self.plot_series = data[score]

    def plot_rugplot(self, figsize=(15, 15), title="Distribution Plot"):
        """绑制地毯图。

        在核密度估计图上叠加地毯图显示数据分布密度。

        Parameters
        ----------
        figsize : tuple, optional
            图形尺寸,默认为(15, 15)
        title : str, optional
            图形标题,默认为"Distribution Plot"
        """
        plt.figure(figsize=figsize)
        sns.kdeplot(self.plot_series, color='purple')
        sns.rugplot(self.plot_series, color='purple')
        plt.title(title)
        plt.xlabel(self.score)
        plt.ylabel('Density')

    def plot_kdeplot(self, figsize=(15, 15), title="Distribution Plot"):
        """绑制核密度估计图。

        使用填充的核密度估计图展示数据分布。

        Parameters
        ----------
        figsize : tuple, optional
            图形尺寸,默认为(15, 15)
        title : str, optional
            图形标题,默认为"Distribution Plot"
        """
        plt.figure(figsize=figsize)
        sns.kdeplot(self.plot_series, fill=True, color='orange')
        plt.title(title)
        plt.xlabel(self.score)
        plt.ylabel('Density')

    def plot_displot(self, figsize=(15, 15), title="Distribution Plot", nbins=10):
        """绑制分布直方图。

        绑制带核密度估计的直方图展示数据分布。

        Parameters
        ----------
        figsize : tuple, optional
            图形尺寸,默认为(15, 15)
        title : str, optional
            图形标题,默认为"Distribution Plot"
        nbins : int, optional
            直方图的箱子数量,默认为10
        """
        plt.figure(figsize=figsize)
        sns.displot(self.plot_series, kde=True, bins=nbins)
        plt.title(title)
        plt.xlabel(self.score)
        plt.ylabel('Density')

    def plot(self, method='displot', title="Distribution Plot", figsize=(15, 15), nbins=10):
        """绑制定分布图。

        根据指定的方法绑制变量分布图。

        Parameters
        ----------
        method : str, optional
            绑制方法,可选'rugplot'、'kdeplot'或'displot',默认为'displot'
        title : str, optional
            图形标题,默认为"Distribution Plot"
        figsize : tuple, optional
            图形尺寸,默认为(15, 15)
        nbins : int, optional
            直方图的箱子数量(仅用于displot方法),默认为10

        Raises
        ------
        ValueError
            当指定了不支持的绑制方法时抛出

        Examples
        --------
        >>> plotter = DistributionPlotter(df, 'age')
        >>> plotter.plot(method='kdeplot', title='Age Distribution')
        """
        if method == 'rugplot':
            self.plot_rugplot(figsize=figsize, title=title)
        elif method == 'kdeplot':
            self.plot_kdeplot(figsize=figsize, title=title)
        elif method == 'displot':
            self.plot_displot(figsize=figsize, title=title, nbins=nbins)
        else:
            raise ValueError(f"Unsupported method: {method}. Choose from 'rugplot', 'kdeplot', 'displot'.")

plot_rugplot

plot_rugplot(figsize=(15, 15), title='Distribution Plot')

绑制地毯图。

在核密度估计图上叠加地毯图显示数据分布密度。

参数:

名称 类型 描述 默认
figsize tuple

图形尺寸,默认为(15, 15)

(15, 15)
title str

图形标题,默认为"Distribution Plot"

'Distribution Plot'
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def plot_rugplot(self, figsize=(15, 15), title="Distribution Plot"):
    """绑制地毯图。

    在核密度估计图上叠加地毯图显示数据分布密度。

    Parameters
    ----------
    figsize : tuple, optional
        图形尺寸,默认为(15, 15)
    title : str, optional
        图形标题,默认为"Distribution Plot"
    """
    plt.figure(figsize=figsize)
    sns.kdeplot(self.plot_series, color='purple')
    sns.rugplot(self.plot_series, color='purple')
    plt.title(title)
    plt.xlabel(self.score)
    plt.ylabel('Density')

plot_kdeplot

plot_kdeplot(figsize=(15, 15), title='Distribution Plot')

绑制核密度估计图。

使用填充的核密度估计图展示数据分布。

参数:

名称 类型 描述 默认
figsize tuple

图形尺寸,默认为(15, 15)

(15, 15)
title str

图形标题,默认为"Distribution Plot"

'Distribution Plot'
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def plot_kdeplot(self, figsize=(15, 15), title="Distribution Plot"):
    """绑制核密度估计图。

    使用填充的核密度估计图展示数据分布。

    Parameters
    ----------
    figsize : tuple, optional
        图形尺寸,默认为(15, 15)
    title : str, optional
        图形标题,默认为"Distribution Plot"
    """
    plt.figure(figsize=figsize)
    sns.kdeplot(self.plot_series, fill=True, color='orange')
    plt.title(title)
    plt.xlabel(self.score)
    plt.ylabel('Density')

plot_displot

plot_displot(figsize=(15, 15), title='Distribution Plot', nbins=10)

绑制分布直方图。

绑制带核密度估计的直方图展示数据分布。

参数:

名称 类型 描述 默认
figsize tuple

图形尺寸,默认为(15, 15)

(15, 15)
title str

图形标题,默认为"Distribution Plot"

'Distribution Plot'
nbins int

直方图的箱子数量,默认为10

10
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def plot_displot(self, figsize=(15, 15), title="Distribution Plot", nbins=10):
    """绑制分布直方图。

    绑制带核密度估计的直方图展示数据分布。

    Parameters
    ----------
    figsize : tuple, optional
        图形尺寸,默认为(15, 15)
    title : str, optional
        图形标题,默认为"Distribution Plot"
    nbins : int, optional
        直方图的箱子数量,默认为10
    """
    plt.figure(figsize=figsize)
    sns.displot(self.plot_series, kde=True, bins=nbins)
    plt.title(title)
    plt.xlabel(self.score)
    plt.ylabel('Density')

plot

plot(method='displot', title='Distribution Plot', figsize=(15, 15), nbins=10)

绑制定分布图。

根据指定的方法绑制变量分布图。

参数:

名称 类型 描述 默认
method str

绑制方法,可选'rugplot'、'kdeplot'或'displot',默认为'displot'

'displot'
title str

图形标题,默认为"Distribution Plot"

'Distribution Plot'
figsize tuple

图形尺寸,默认为(15, 15)

(15, 15)
nbins int

直方图的箱子数量(仅用于displot方法),默认为10

10

引发:

类型 描述
ValueError

当指定了不支持的绑制方法时抛出

示例:

>>> plotter = DistributionPlotter(df, 'age')
>>> plotter.plot(method='kdeplot', title='Age Distribution')
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def plot(self, method='displot', title="Distribution Plot", figsize=(15, 15), nbins=10):
    """绑制定分布图。

    根据指定的方法绑制变量分布图。

    Parameters
    ----------
    method : str, optional
        绑制方法,可选'rugplot'、'kdeplot'或'displot',默认为'displot'
    title : str, optional
        图形标题,默认为"Distribution Plot"
    figsize : tuple, optional
        图形尺寸,默认为(15, 15)
    nbins : int, optional
        直方图的箱子数量(仅用于displot方法),默认为10

    Raises
    ------
    ValueError
        当指定了不支持的绑制方法时抛出

    Examples
    --------
    >>> plotter = DistributionPlotter(df, 'age')
    >>> plotter.plot(method='kdeplot', title='Age Distribution')
    """
    if method == 'rugplot':
        self.plot_rugplot(figsize=figsize, title=title)
    elif method == 'kdeplot':
        self.plot_kdeplot(figsize=figsize, title=title)
    elif method == 'displot':
        self.plot_displot(figsize=figsize, title=title, nbins=nbins)
    else:
        raise ValueError(f"Unsupported method: {method}. Choose from 'rugplot', 'kdeplot', 'displot'.")

proc_means_by_grp

proc_means_by_grp(data, varlist, groupby=None, spec_missing_value=None, q=None)

按分组计算变量统计报告。

对指定变量按分组计算描述性统计量,返回包含样本数、均值、分位数和缺失率的报告。 底层调用proc_means类完成计算。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
varlist list

需要进行统计的数值变量名列表

必需
groupby list

分组变量名列表,默认为空列表(不分组)

None
spec_missing_value any

需要被当作缺失值处理的特殊值,默认为None

None
q list

分位数列表,默认为[0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

None

返回:

类型 描述
DataFrame

分组统计报告,包含各变量的描述性统计量

示例:

>>> result = proc_means_by_grp(df, ['age', 'score'], ['gender'])
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def proc_means_by_grp(data, varlist, groupby=None, spec_missing_value=None, q=None):
    """按分组计算变量统计报告。

    对指定变量按分组计算描述性统计量,返回包含样本数、均值、分位数和缺失率的报告。
    底层调用proc_means类完成计算。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    varlist : list
        需要进行统计的数值变量名列表
    groupby : list, optional
        分组变量名列表,默认为空列表(不分组)
    spec_missing_value : any, optional
        需要被当作缺失值处理的特殊值,默认为None
    q : list, optional
        分位数列表,默认为[0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

    Returns
    -------
    pd.DataFrame
        分组统计报告,包含各变量的描述性统计量

    Examples
    --------
    >>> result = proc_means_by_grp(df, ['age', 'score'], ['gender'])
    """
    if groupby is None:
        groupby = []
    if q is None:
        q = [0.05, 0.15, 0.25, 0.5, 0.75, 0.95, 0.99]

    means = proc_means(data, varlist, groupby=groupby, spec_missing_value=spec_missing_value)
    means_rpt = means(q=q)
    means_rpt = means_rpt.reset_index(drop=False)

    return means_rpt

get_distribution_shift_single_var

get_distribution_shift_single_var(data, var, grp_name, benchmark_value, outlier_value=0.99)

计算单个变量的分布偏移。

分析指定变量在各分组中超过基准组异常值阈值的观测比例。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
var str

待分析的变量名

必需
grp_name str

分组变量名

必需
benchmark_value any

基准组的分组值

必需
outlier_value float

用于确定异常值阈值的分位数,默认为0.99

0.99

返回:

类型 描述
dict

键为分组值,值为超过阈值的观测比例

示例:

>>> result = get_distribution_shift_single_var(df, 'age', 'gender', 'Male')
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def get_distribution_shift_single_var(data, var, grp_name, benchmark_value, outlier_value=0.99):
    """计算单个变量的分布偏移。

    分析指定变量在各分组中超过基准组异常值阈值的观测比例。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    var : str
        待分析的变量名
    grp_name : str
        分组变量名
    benchmark_value : any
        基准组的分组值
    outlier_value : float, optional
        用于确定异常值阈值的分位数,默认为0.99

    Returns
    -------
    dict
        键为分组值,值为超过阈值的观测比例

    Examples
    --------
    >>> result = get_distribution_shift_single_var(df, 'age', 'gender', 'Male')
    """
    analyzer = DistributionShiftAnalyzer(data, grp_name, benchmark_value)
    return analyzer.analyze_single_var(var, outlier_value)

get_distribution_shift

get_distribution_shift(data, varlist, grp_name, benchmark_value, outlier_value=0.99)

计算多个变量的分布偏移。

对变量列表中每个变量分析各分组超过基准组异常值阈值的观测比例, 返回包含所有结果的转置数据框。

参数:

名称 类型 描述 默认
data DataFrame

输入的原始数据框

必需
varlist list

待分析的变量名列表

必需
grp_name str

分组变量名

必需
benchmark_value any

基准组的分组值

必需
outlier_value float

用于确定异常值阈值的分位数,默认为0.99

0.99

返回:

类型 描述
DataFrame

行索引为变量名,列为各分组值,内容为超过阈值的观测比例

示例:

>>> result = get_distribution_shift(df, ['age', 'score'], 'gender', 'Male')
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def get_distribution_shift(data, varlist, grp_name, benchmark_value, outlier_value=0.99):
    """计算多个变量的分布偏移。

    对变量列表中每个变量分析各分组超过基准组异常值阈值的观测比例,
    返回包含所有结果的转置数据框。

    Parameters
    ----------
    data : pd.DataFrame
        输入的原始数据框
    varlist : list
        待分析的变量名列表
    grp_name : str
        分组变量名
    benchmark_value : any
        基准组的分组值
    outlier_value : float, optional
        用于确定异常值阈值的分位数,默认为0.99

    Returns
    -------
    pd.DataFrame
        行索引为变量名,列为各分组值,内容为超过阈值的观测比例

    Examples
    --------
    >>> result = get_distribution_shift(df, ['age', 'score'], 'gender', 'Male')
    """
    analyzer = DistributionShiftAnalyzer(data, grp_name, benchmark_value)
    return analyzer.analyze(varlist, outlier_value)

plot_distribution

plot_distribution(data, score, method='displot', title='Distribution Plot', figsize=(15, 15), nbins=10)

绑制变量分布图。

根据指定的方法绑制变量的分布图,支持核密度估计、直方图和地毯图。

参数:

名称 类型 描述 默认
data DataFrame

输入的数据框

必需
score str

用于绑制分布的变量名

必需
method str

绑制方法,可选'rugplot'、'kdeplot'或'displot',默认为'displot'

'displot'
title str

图形标题,默认为"Distribution Plot"

'Distribution Plot'
figsize tuple

图形尺寸,默认为(15, 15)

(15, 15)
nbins int

直方图的箱子数量(仅用于displot方法),默认为10

10

返回:

类型 描述
None

直接显示绑制图形

示例:

>>> plot_distribution(df, 'age', method='kdeplot')
>>> plot_distribution(df, 'score', method='displot', nbins=20)
源代码位于: Modeling_Tool/Feature/Distribution_Tool.py
def plot_distribution(data, score, method='displot', title="Distribution Plot", figsize=(15, 15), nbins=10):
    """绑制变量分布图。

    根据指定的方法绑制变量的分布图,支持核密度估计、直方图和地毯图。

    Parameters
    ----------
    data : pd.DataFrame
        输入的数据框
    score : str
        用于绑制分布的变量名
    method : str, optional
        绑制方法,可选'rugplot'、'kdeplot'或'displot',默认为'displot'
    title : str, optional
        图形标题,默认为"Distribution Plot"
    figsize : tuple, optional
        图形尺寸,默认为(15, 15)
    nbins : int, optional
        直方图的箱子数量(仅用于displot方法),默认为10

    Returns
    -------
    None
        直接显示绑制图形

    Examples
    --------
    >>> plot_distribution(df, 'age', method='kdeplot')
    >>> plot_distribution(df, 'score', method='displot', nbins=20)
    """
    plotter = DistributionPlotter(data, score)
    plotter.plot(method=method, title=title, figsize=figsize, nbins=nbins)