跳转至

Modeling_Tool.Eval

模型评估层 —— Gains 表、ROC/KS、链式评估流水线。传入 weight_colsample_weight 时走加权路径;未传则与历史未加权行为一致。

用户指南:模型评估 — 样本权重评估

加权评估实现 — weighted_eval_utils

内部加权 ROC / Gains / 性能汇总实现;公开 API 在检测到权重参数时自动委托此模块。

weighted_eval_utils

Native weighted evaluation helpers.

Shared implementation for public Eval APIs when sample_weight or weight_col is supplied. Unweighted callers keep using the historical implementations in evaluate_model.py and Model_Eval_Tool.py.

cross_risk_weighted_mean

cross_risk_weighted_mean(data, agg_col, sample_weight, score_list, margin_name='Total_Avg_Risk')

Weighted-mean cross-risk table after bin columns are assigned.

源代码位于: Modeling_Tool/Eval/weighted_eval_utils.py
def cross_risk_weighted_mean(data, agg_col, sample_weight, score_list, margin_name="Total_Avg_Risk"):
    """Weighted-mean cross-risk table after bin columns are assigned."""
    weight = np.asarray(sample_weight, dtype=float)
    values = pd.to_numeric(data[agg_col], errors="coerce").to_numpy(dtype=float)
    frame = data[["_bin_num1", "_bin_range1", "_bin_num2", "_bin_range2"]].copy()
    frame["_w"] = weight
    frame["_wv"] = values * weight

    numerator = pd.crosstab(
        [frame["_bin_num1"], frame["_bin_range1"]],
        [frame["_bin_num2"], frame["_bin_range2"]],
        values=frame["_wv"],
        aggfunc="sum",
        margins=True,
        margins_name=margin_name,
        rownames=[score_list[0], score_list[0]],
        colnames=[score_list[1], score_list[1]],
    )
    denominator = pd.crosstab(
        [frame["_bin_num1"], frame["_bin_range1"]],
        [frame["_bin_num2"], frame["_bin_range2"]],
        values=frame["_w"],
        aggfunc="sum",
        margins=True,
        margins_name=margin_name,
        rownames=[score_list[0], score_list[0]],
        colnames=[score_list[1], score_list[1]],
    )
    return numerator / denominator.replace(0, np.nan)

模型评估主控 — Model_Eval_Tool

Model_Eval_Tool

GainsTableCalculator

收益表计算器。

整合了收益表计算的多种功能,支持基础收益表和自定义指标收益表。 提供面向对象的接口进行分组收益表计算。

参数:

名称 类型 描述 默认
data DataFrame

输入数据表

必需
dep str

目标变量名

必需
nbins int

分箱数量

10
precision int

边界值精度

5
min_bin_prop float

每箱最小样本占比

0.05
include_missing bool

是否包含缺失值

True
score str

分数字段名

None
model sklearn-like model

机器学习模型

None
varlist list

模型特征列表

None
equal_freq bool

True为等频分箱

True
chi2_method bool

是否使用卡方分箱

False
chi2_p float

卡方检验显著性水平

0.95
init_equi_bins int

初始等频分箱数量

100
fillna any

缺失值填充值

-999999
spec_values list

特殊值列表

[]
tree_binning bool

是否使用决策树分箱

False
random_state int

随机种子

42
ascending bool

分箱顺序是否升序

False

示例:

>>> calc = GainsTableCalculator(data, dep='target', score='score', nbins=10)
>>> result = calc.calculate(grp_name='region')
源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
class GainsTableCalculator:
    """
    收益表计算器。

    整合了收益表计算的多种功能,支持基础收益表和自定义指标收益表。
    提供面向对象的接口进行分组收益表计算。

    Parameters
    ----------
    data : pandas.DataFrame
        输入数据表
    dep : str
        目标变量名
    nbins : int, default 10
        分箱数量
    precision : int, default 5
        边界值精度
    min_bin_prop : float, default 0.05
        每箱最小样本占比
    include_missing : bool, default True
        是否包含缺失值
    score : str, optional
        分数字段名
    model : sklearn-like model, optional
        机器学习模型
    varlist : list, optional
        模型特征列表
    equal_freq : bool, default True
        True为等频分箱
    chi2_method : bool, default False
        是否使用卡方分箱
    chi2_p : float, default 0.95
        卡方检验显著性水平
    init_equi_bins : int, default 100
        初始等频分箱数量
    fillna : any, default -999999
        缺失值填充值
    spec_values : list, default []
        特殊值列表
    tree_binning : bool, default False
        是否使用决策树分箱
    random_state : int, default 42
        随机种子
    ascending : bool, default False
        分箱顺序是否升序

    Examples
    --------
    >>> calc = GainsTableCalculator(data, dep='target', score='score', nbins=10)
    >>> result = calc.calculate(grp_name='region')
    """

    def __init__(self, data, dep, nbins = 10, precision = 5, min_bin_prop = 0.05,
                 include_missing = True, score = None, model = None, varlist = None,
                 equal_freq = True, chi2_method = False, chi2_p = 0.95, 
                 init_equi_bins = 100, fillna = -999999, spec_values = [],
                 tree_binning = False, random_state = 42, ascending = False,
                 weight_col = None, weighted_binning = None):
        """
        初始化收益表计算器。

        Parameters
        ----------
        data : pandas.DataFrame
            输入数据表
        dep : str
            目标变量名
        nbins : int, default 10
            分箱数量
        precision : int, default 5
            边界值精度
        min_bin_prop : float, default 0.05
            每箱最小样本占比
        include_missing : bool, default True
            是否包含缺失值
        score : str, optional
            分数字段名
        model : sklearn-like model, optional
            机器学习模型
        varlist : list, optional
            模型特征列表
        equal_freq : bool, default True
            True为等频分箱
        chi2_method : bool, default False
            是否使用卡方分箱
        chi2_p : float, default 0.95
            卡方检验显著性水平
        init_equi_bins : int, default 100
            初始等频分箱数量
        fillna : any, default -999999
            缺失值填充值
        spec_values : list, default []
            特殊值列表
        tree_binning : bool, default False
            是否使用决策树分箱
        random_state : int, default 42
            随机种子
        ascending : bool, default False
            分箱顺序是否升序
        weight_col : str, optional
            样本权重列名(须在 ``data`` 中)
        weighted_binning : bool, optional
            True 时按累计权重等频分箱
        """
        self.data = data
        self.dep = dep
        self.nbins = nbins
        self.precision = precision
        self.min_bin_prop = min_bin_prop
        self.include_missing = include_missing
        self.score = score
        self.model = model
        self.varlist = varlist
        self.equal_freq = equal_freq
        self.chi2_method = chi2_method
        self.chi2_p = chi2_p
        self.init_equi_bins = init_equi_bins
        self.fillna = fillna
        self.spec_values = spec_values
        self.tree_binning = tree_binning
        self.random_state = random_state
        self.ascending = ascending
        self.weight_col = weight_col
        self.weighted_binning = weighted_binning

    def calculate(self, grp_name = None, min_data_size = 100, grp_colname = None,
                  sync_range = True, retSummary = False, withSummary = False,
                  wholeGroup = False, add_func = None, weight_col = None):
        """
        计算收益表。

        Parameters
        ----------
        grp_name : str, optional
            分组字段名
        min_data_size : int, default 100
            每组最小样本数
        grp_colname : str, optional
            分组结果列名
        sync_range : bool, default True
            是否同步分箱边界
        retSummary : bool, default False
            是否只返回汇总指标
        withSummary : bool, default False
            是否包含总体汇总行
        wholeGroup : bool, default False
            是否使用全部数据分组
        add_func : callable, optional
            自定义统计函数

        Returns
        -------
        pandas.DataFrame
            收益表
        """
        return get_gains_table(
            data = self.data,
            dep = self.dep,
            nbins = self.nbins,
            precision = self.precision,
            min_bin_prop = self.min_bin_prop,
            include_missing = self.include_missing,
            score = self.score,
            model = self.model,
            varlist = self.varlist,
            equal_freq = self.equal_freq,
            chi2_method = self.chi2_method,
            grp_name = grp_name,
            min_data_size = min_data_size,
            grp_colname = grp_colname,
            sync_range = sync_range,
            chi2_p = self.chi2_p,
            init_equi_bins = self.init_equi_bins,
            fillna = self.fillna,
            spec_values = self.spec_values,
            retSummary = retSummary,
            tree_binning = self.tree_binning,
            random_state = self.random_state,
            ascending = self.ascending,
            withSummary = withSummary,
            wholeGroup = wholeGroup,
            add_func = add_func,
            weight_col = self.weight_col if weight_col is None else weight_col,
            weighted_binning = self.weighted_binning,
        )

calculate

calculate(grp_name=None, min_data_size=100, grp_colname=None, sync_range=True, retSummary=False, withSummary=False, wholeGroup=False, add_func=None, weight_col=None)

计算收益表。

参数:

名称 类型 描述 默认
grp_name str

分组字段名

None
min_data_size int

每组最小样本数

100
grp_colname str

分组结果列名

None
sync_range bool

是否同步分箱边界

True
retSummary bool

是否只返回汇总指标

False
withSummary bool

是否包含总体汇总行

False
wholeGroup bool

是否使用全部数据分组

False
add_func callable

自定义统计函数

None

返回:

类型 描述
DataFrame

收益表

源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
def calculate(self, grp_name = None, min_data_size = 100, grp_colname = None,
              sync_range = True, retSummary = False, withSummary = False,
              wholeGroup = False, add_func = None, weight_col = None):
    """
    计算收益表。

    Parameters
    ----------
    grp_name : str, optional
        分组字段名
    min_data_size : int, default 100
        每组最小样本数
    grp_colname : str, optional
        分组结果列名
    sync_range : bool, default True
        是否同步分箱边界
    retSummary : bool, default False
        是否只返回汇总指标
    withSummary : bool, default False
        是否包含总体汇总行
    wholeGroup : bool, default False
        是否使用全部数据分组
    add_func : callable, optional
        自定义统计函数

    Returns
    -------
    pandas.DataFrame
        收益表
    """
    return get_gains_table(
        data = self.data,
        dep = self.dep,
        nbins = self.nbins,
        precision = self.precision,
        min_bin_prop = self.min_bin_prop,
        include_missing = self.include_missing,
        score = self.score,
        model = self.model,
        varlist = self.varlist,
        equal_freq = self.equal_freq,
        chi2_method = self.chi2_method,
        grp_name = grp_name,
        min_data_size = min_data_size,
        grp_colname = grp_colname,
        sync_range = sync_range,
        chi2_p = self.chi2_p,
        init_equi_bins = self.init_equi_bins,
        fillna = self.fillna,
        spec_values = self.spec_values,
        retSummary = retSummary,
        tree_binning = self.tree_binning,
        random_state = self.random_state,
        ascending = self.ascending,
        withSummary = withSummary,
        wholeGroup = wholeGroup,
        add_func = add_func,
        weight_col = self.weight_col if weight_col is None else weight_col,
        weighted_binning = self.weighted_binning,
    )

PerformanceEvaluator

性能评估器。

整合了模型性能评估的多种功能,支持多数据集、多分组的性能评估。 提供面向对象的接口进行性能指标计算和汇总。

参数:

名称 类型 描述 默认
tgt_name str or list of str

目标变量名。可传入多个 y 标签的 list/tuple → 逐标签评估后纵向拼接 (输出新增 tgt_name 列), 并为每个标签各自出图。

必需
scr_name str

分数字段名

None
model sklearn-like model

机器学习模型

None
feature_cols list

模型特征列表

None
dist_bins int

分布分箱数

20
pct_bins int

百分比分箱数

10
precision int

边界值精度

5
min_bin_prop float

每箱最小样本占比

0.05
include_missing bool

是否包含缺失值

False
equal_freq bool

True为等频分箱

True
chi2_method bool

是否使用卡方分箱

False
init_equi_bins int

初始等频分箱数量

1000
chi2_p float

卡方检验显著性水平

0.9
tree_binning bool

是否使用决策树分箱

False
random_state int

随机种子

42
weight_col str

默认权重列;各 add_dataset 也可单独指定

None

示例:

>>> evaluator = PerformanceEvaluator(tgt_name='target', model=model, feature_cols=features)
>>> evaluator.add_dataset('train', train_df)
>>> evaluator.add_dataset('validation', val_df)
>>> evaluator.add_dataset('oot', oot_df)
>>> result = evaluator.evaluate()
>>>
>>> # 多 y 标签: tgt_name 传 list → 输出含 tgt_name 列的纵向拼接表, 每个标签各自出图
>>> evaluator = PerformanceEvaluator(tgt_name=['bad_dpd7', 'bad_dpd30'], model=model, feature_cols=features)
>>> evaluator.add_dataset('train', train_df).add_dataset('oot', oot_df)
>>> result = evaluator.evaluate(to_show=True)
源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
class PerformanceEvaluator:
    """
    性能评估器。

    整合了模型性能评估的多种功能,支持多数据集、多分组的性能评估。
    提供面向对象的接口进行性能指标计算和汇总。

    Parameters
    ----------
    tgt_name : str or list of str
        目标变量名。可传入多个 y 标签的 list/tuple → 逐标签评估后纵向拼接 (输出新增 tgt_name 列),
        并为每个标签各自出图。
    scr_name : str, optional
        分数字段名
    model : sklearn-like model, optional
        机器学习模型
    feature_cols : list, optional
        模型特征列表
    dist_bins : int, default 20
        分布分箱数
    pct_bins : int, default 10
        百分比分箱数
    precision : int, default 5
        边界值精度
    min_bin_prop : float, default 0.05
        每箱最小样本占比
    include_missing : bool, default False
        是否包含缺失值
    equal_freq : bool, default True
        True为等频分箱
    chi2_method : bool, default False
        是否使用卡方分箱
    init_equi_bins : int, default 1000
        初始等频分箱数量
    chi2_p : float, default 0.9
        卡方检验显著性水平
    tree_binning : bool, default False
        是否使用决策树分箱
    random_state : int, default 42
        随机种子
    weight_col : str, optional
        默认权重列;各 ``add_dataset`` 也可单独指定

    Examples
    --------
    >>> evaluator = PerformanceEvaluator(tgt_name='target', model=model, feature_cols=features)
    >>> evaluator.add_dataset('train', train_df)
    >>> evaluator.add_dataset('validation', val_df)
    >>> evaluator.add_dataset('oot', oot_df)
    >>> result = evaluator.evaluate()
    >>>
    >>> # 多 y 标签: tgt_name 传 list → 输出含 tgt_name 列的纵向拼接表, 每个标签各自出图
    >>> evaluator = PerformanceEvaluator(tgt_name=['bad_dpd7', 'bad_dpd30'], model=model, feature_cols=features)
    >>> evaluator.add_dataset('train', train_df).add_dataset('oot', oot_df)
    >>> result = evaluator.evaluate(to_show=True)
    """

    def __init__(self, tgt_name, scr_name = None, model = None, feature_cols = None,
                 dist_bins = 20, pct_bins = 10, precision = 5, min_bin_prop = 0.05,
                 include_missing = False, equal_freq = True, chi2_method = False,
                 init_equi_bins = 1000, chi2_p = 0.9, tree_binning = False, random_state = 42,
                 weight_col = None):
        """
        初始化性能评估器。

        Parameters
        ----------
        tgt_name : str or list of str
            目标变量名。可传入多个 y 标签的 list/tuple → 逐标签评估后纵向拼接 (输出新增 tgt_name 列),
            并为每个标签各自出图。
        scr_name : str, optional
            分数字段名
        model : sklearn-like model, optional
            机器学习模型
        feature_cols : list, optional
            模型特征列表
        dist_bins : int, default 20
            分布分箱数
        pct_bins : int, default 10
            百分比分箱数
        precision : int, default 5
            边界值精度
        min_bin_prop : float, default 0.05
            每箱最小样本占比
        include_missing : bool, default False
            是否包含缺失值
        equal_freq : bool, default True
            True为等频分箱
        chi2_method : bool, default False
            是否使用卡方分箱
        init_equi_bins : int, default 1000
            初始等频分箱数量
        chi2_p : float, default 0.9
            卡方检验显著性水平
        tree_binning : bool, default False
            是否使用决策树分箱
        random_state : int, default 42
            随机种子
        weight_col : str, optional
            默认权重列;各 ``add_dataset`` 也可单独指定
        """
        self.tgt_name = tgt_name
        self.scr_name = scr_name
        self.model = model
        self.feature_cols = feature_cols
        self.dist_bins = dist_bins
        self.pct_bins = pct_bins
        self.precision = precision
        self.min_bin_prop = min_bin_prop
        self.include_missing = include_missing
        self.equal_freq = equal_freq
        self.chi2_method = chi2_method
        self.init_equi_bins = init_equi_bins
        self.chi2_p = chi2_p
        self.tree_binning = tree_binning
        self.random_state = random_state
        self.weight_col = weight_col
        self.datasets = {}
        self.dataset_weight_cols = {}

    def add_dataset(self, name, data, weight_col = None):
        """
        添加数据集。

        Parameters
        ----------
        name : str
            数据集名称(如'train'、'validation'、'oot')
        data : pandas.DataFrame
            数据集

        Returns
        -------
        self
            返回自身以便链式调用
        """
        self.datasets[name] = data
        self.dataset_weight_cols[name] = weight_col
        return self

    def evaluate(self, oot_grp_name = None, min_data_size = 100, grp_colname = None,
                 fig_save_path = None, rpt_save_path = None, to_show = False, 
                 display = True, gains_table = False, benchmark_dataset = None,
                 weight_col = None):
        """
        执行性能评估。

        Parameters
        ----------
        oot_grp_name : str, optional
            oot分组字段名
        min_data_size : int, default 100
            每组最小样本数
        grp_colname : str, optional
            分组结果列名
        fig_save_path : str, optional
            图片保存路径
        rpt_save_path : str, optional
            报告保存路径
        to_show : bool, default False
            是否显示图形
        display : bool, default True
            是否打印结果
        gains_table : bool, default True
            是否计算收益表
        benchmark_dataset : str or pandas.DataFrame, optional
            固定分箱边界的基准数据集。若传入str, 则从add_dataset添加的数据集中按名称获取;
            若传入DataFrame, 则直接使用该DataFrame。默认None表示各数据集独立分箱。

        Returns
        -------
        pandas.DataFrame
            性能评估汇总表。若实例的 ``tgt_name`` 为多标签 list/tuple, 则对每个标签分别评估,
            结果新增 ``tgt_name`` 列后纵向拼接; ``to_show=True`` 时每个标签各出一张图,
            ``fig_save_path`` 自动按标签加后缀 (如 ``perf.png`` → ``perf_<label>.png``)。
        """
        if len(self.datasets) == 0:
            return pd.DataFrame([])

        if self.scr_name is None and self.model is None and self.feature_cols is None:
            return -1

        if self.scr_name is None and self.model is None:
            return -2

        if self.scr_name is None and self.feature_cols is None:
            return -3

        active_weight_col = weight_col or self.weight_col
        has_dataset_weight = any(v is not None for v in self.dataset_weight_cols.values())
        if (active_weight_col is not None or has_dataset_weight) and oot_grp_name is None and benchmark_dataset is None and not isinstance(self.tgt_name, (list, tuple)):
            rows = []
            for name, data in self.datasets.items():
                if data is None:
                    continue
                wc = self.dataset_weight_cols.get(name) or active_weight_col
                work_data = data.copy()
                scr_name = self.scr_name
                if scr_name is None:
                    scr_name = "_mdl_scr"
                    work_data[scr_name] = self.model.predict_proba(work_data.loc[:, self.feature_cols])[:, 1]
                rows.append(
                    _weighted_eval.dataset_summary(
                        name,
                        work_data,
                        self.tgt_name,
                        scr_name,
                        weight_col=wc,
                        nbins=self.pct_bins,
                    )
                )
            fnl_df = pd.DataFrame(rows)
            if display:
                from IPython.display import display as _ipy_display
                _ipy_display(fnl_df)
            if fig_save_path:
                eval_datasets = {}
                for name, data in self.datasets.items():
                    if data is None:
                        continue
                    work_data = data.copy()
                    scr_name = self.scr_name
                    if scr_name is None:
                        scr_name = "_mdl_scr"
                        work_data[scr_name] = self.model.predict_proba(work_data.loc[:, self.feature_cols])[:, 1]
                    eval_datasets[name] = {
                        "y_true": work_data[self.tgt_name],
                        "y_score": work_data[scr_name],
                    }
                if eval_datasets:
                    evaluate_performance(
                        datasets=eval_datasets,
                        dist_bins=self.dist_bins,
                        pct_bins=self.pct_bins,
                        square_figsize=5,
                        to_show=to_show,
                        save_path=fig_save_path,
                        gains_table=gains_table,
                        equal_freq=self.equal_freq,
                    )
            if rpt_save_path:
                fnl_df.to_csv(rpt_save_path, index=False)
            return fnl_df

        # ── 多 y 标签支持: tgt_name 为 list/tuple 时, 逐标签评估后纵向拼接 ──
        #    表格: 每个标签结果新增 tgt_name 列后纵向拼接;
        #    图片: 每个标签各自出图 (to_show=True 时循环显示, fig_save_path 自动按标签加后缀)。
        if isinstance(self.tgt_name, (list, tuple)):
            import os as _os

            def _suffix_path(p, lab):
                if not p:
                    return None
                root, ext = _os.path.splitext(str(p))
                return "{0}_{1}{2}".format(root, lab, ext)

            _orig_tgt = self.tgt_name
            multi_results = []
            try:
                for _t in list(self.tgt_name):
                    self.tgt_name = _t
                    sub_df = self.evaluate(
                        oot_grp_name = oot_grp_name,
                        min_data_size = min_data_size,
                        grp_colname = grp_colname,
                        fig_save_path = _suffix_path(fig_save_path, _t),
                        rpt_save_path = None,        # 拼接后统一保存 (见下)
                        to_show = to_show,           # 每个标签各自出图
                        display = False,             # 拼接后统一展示
                        gains_table = gains_table,
                        benchmark_dataset = benchmark_dataset,
                    )
                    if isinstance(sub_df, pd.DataFrame) and sub_df.shape[0] > 0:
                        sub_df.insert(0, "tgt_name", _t)
                        multi_results.append(sub_df)
            finally:
                self.tgt_name = _orig_tgt

            fnl_df = pd.concat(multi_results, ignore_index = True) if multi_results else pd.DataFrame([])

            if display:
                from IPython.display import display as _ipy_display
                _ipy_display(fnl_df)
            if rpt_save_path:
                fnl_df.to_csv(rpt_save_path, index = False)
            return fnl_df

        def _get_score(data):
            if self.scr_name is not None:
                return data[self.scr_name]
            return self.model.predict_proba(data.loc[:, self.feature_cols])[:, 1]

        def _get_benchmark_bin_edges():
            if benchmark_dataset is None:
                return None

            if isinstance(benchmark_dataset, str):
                if benchmark_dataset not in self.datasets:
                    raise ValueError("benchmark_dataset must be one of added datasets: {0}".format(list(self.datasets.keys())))
                benchmark_data = self.datasets[benchmark_dataset]
            else:
                benchmark_data = benchmark_dataset

            benchmark_score = "_benchmark_score"
            benchmark_df = pd.DataFrame({
                self.tgt_name: benchmark_data[self.tgt_name],
                benchmark_score: _get_score(benchmark_data)
            })

            _, benchmark_bin_edges = super_binning(
                data = benchmark_df,
                score = benchmark_score,
                dep = self.tgt_name,
                nbins = self.pct_bins,
                precision = self.precision,
                min_bin_prop = self.min_bin_prop,
                include_missing = self.include_missing,
                equal_freq = self.equal_freq,
                chi2_method = self.chi2_method,
                chi2_p = self.chi2_p,
                init_equi_bins = self.init_equi_bins,
                tree_binning = self.tree_binning,
                random_state = self.random_state,
                return_edges = True,
                ascending = True
            )

            if benchmark_bin_edges is None or len(benchmark_bin_edges) < 2:
                raise ValueError("Cannot generate benchmark bin edges from benchmark_dataset.")

            return list(benchmark_bin_edges)

        benchmark_bin_edges = _get_benchmark_bin_edges()

        def _evaluate_dataset_dict(dataset_dict, save_path = None):
            eval_datasets = {}
            for name, data in dataset_dict.items():
                if data is None:
                    continue
                eval_datasets[name] = {
                    "y_true": data[self.tgt_name],
                    "y_score": _get_score(data)
                }

            if len(eval_datasets) == 0:
                return pd.DataFrame([])

            model_eval_result_df = evaluate_performance(
                datasets = eval_datasets,
                dist_bins = self.dist_bins,
                pct_bins = self.pct_bins,
                square_figsize = 5,
                to_show = to_show,
                save_path = save_path,
                gains_table = gains_table,
                equal_freq = self.equal_freq,
                pct_bin_edges = benchmark_bin_edges
            )

            import re
            btm_cols = [x for x in model_eval_result_df.columns if x.startswith("Btm")]
            top_cols = [x for x in model_eval_result_df.columns if x.startswith("Top")]

            if btm_cols and top_cols:
                btm_str = btm_cols[0]
                top_str = top_cols[0]
                numbers = re.findall(r'\d+', btm_str)
                if numbers:
                    quantile = int(numbers[0])
                else:
                    quantile = self.pct_bins

                model_eval_result_df[f"Btm{quantile}%_Lift"] = model_eval_result_df[btm_str] / model_eval_result_df["avgTrue"]
                model_eval_result_df[f"Top{quantile}%_Lift"] = model_eval_result_df[top_str] / model_eval_result_df["avgTrue"]
                model_eval_result_df["AUC_Shift"] = model_eval_result_df["AUC"].shift(1) / model_eval_result_df["AUC"] - 1
                model_eval_result_df["KS_Shift"] = model_eval_result_df["KS"].shift(1) / model_eval_result_df["KS"] - 1

            gains_table_cols = ['N_BUMP', 'MIN_RISK_DEP', 'MAX_RISK_DEP', 'KS_IN_GAINS', 'LIFT_IN_GAINS', 'IV', 'N_BINS']
            gains_summ_list = []
            for name, data in dataset_dict.items():
                if data is None:
                    gains_res = pd.DataFrame([], columns = gains_table_cols)
                else:
                    gains_res = get_gains_table(
                        data = data,
                        dep = self.tgt_name,
                        nbins = benchmark_bin_edges if benchmark_bin_edges is not None else self.pct_bins,
                        precision = self.precision,
                        min_bin_prop = self.min_bin_prop,
                        include_missing = self.include_missing,
                        score = self.scr_name,
                        equal_freq = self.equal_freq,
                        chi2_method = False if benchmark_bin_edges is not None else self.chi2_method,
                        model = self.model,
                        varlist = self.feature_cols,
                        init_equi_bins = self.init_equi_bins,
                        chi2_p = self.chi2_p,
                        retSummary = True,
                        tree_binning = False if benchmark_bin_edges is not None else self.tree_binning,
                        random_state = self.random_state
                    )
                gains_res['index'] = name
                gains_summ_list.append(gains_res)

            if gains_summ_list:
                gains_summ = pd.concat(gains_summ_list)
                model_eval_result_df = model_eval_result_df.merge(gains_summ, on = ['index'], how = 'left')

            return model_eval_result_df

        if oot_grp_name is None:
            fnl_df = _evaluate_dataset_dict(self.datasets, save_path = fig_save_path)
        else:
            if grp_colname is None:
                grp_colname = oot_grp_name

            grouped_results = []
            for dataset_name, data in self.datasets.items():
                if data is None or oot_grp_name not in data.columns:
                    continue

                grp_list = data[oot_grp_name].sort_values(ascending = True).unique().tolist()
                valid_grp_list = [x for x in grp_list if data[data[oot_grp_name].isin([x])].shape[0] >= min_data_size]

                for grp in valid_grp_list:
                    data_grp = data[data[oot_grp_name].isin([grp])]
                    perf_res = _evaluate_dataset_dict({dataset_name: data_grp}, save_path = None)
                    if perf_res.shape[0] > 0:
                        perf_res[grp_colname] = grp
                        grouped_results.append(perf_res)

            if len(grouped_results) == 0:
                fnl_df = pd.DataFrame([])
            else:
                fnl_df = pd.concat(grouped_results, ignore_index = True)

        if display:
            from IPython.display import display
            display(fnl_df)

        if rpt_save_path:
            fnl_df.to_csv(rpt_save_path, index = False)

        return fnl_df

add_dataset

add_dataset(name, data, weight_col=None)

添加数据集。

参数:

名称 类型 描述 默认
name str

数据集名称(如'train'、'validation'、'oot')

必需
data DataFrame

数据集

必需

返回:

类型 描述
self

返回自身以便链式调用

源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
def add_dataset(self, name, data, weight_col = None):
    """
    添加数据集。

    Parameters
    ----------
    name : str
        数据集名称(如'train'、'validation'、'oot')
    data : pandas.DataFrame
        数据集

    Returns
    -------
    self
        返回自身以便链式调用
    """
    self.datasets[name] = data
    self.dataset_weight_cols[name] = weight_col
    return self

evaluate

evaluate(oot_grp_name=None, min_data_size=100, grp_colname=None, fig_save_path=None, rpt_save_path=None, to_show=False, display=True, gains_table=False, benchmark_dataset=None, weight_col=None)

执行性能评估。

参数:

名称 类型 描述 默认
oot_grp_name str

oot分组字段名

None
min_data_size int

每组最小样本数

100
grp_colname str

分组结果列名

None
fig_save_path str

图片保存路径

None
rpt_save_path str

报告保存路径

None
to_show bool

是否显示图形

False
display bool

是否打印结果

True
gains_table bool

是否计算收益表

True
benchmark_dataset str or DataFrame

固定分箱边界的基准数据集。若传入str, 则从add_dataset添加的数据集中按名称获取; 若传入DataFrame, 则直接使用该DataFrame。默认None表示各数据集独立分箱。

None

返回:

类型 描述
DataFrame

性能评估汇总表。若实例的 tgt_name 为多标签 list/tuple, 则对每个标签分别评估, 结果新增 tgt_name 列后纵向拼接; to_show=True 时每个标签各出一张图, fig_save_path 自动按标签加后缀 (如 perf.pngperf_<label>.png)。

源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
def evaluate(self, oot_grp_name = None, min_data_size = 100, grp_colname = None,
             fig_save_path = None, rpt_save_path = None, to_show = False, 
             display = True, gains_table = False, benchmark_dataset = None,
             weight_col = None):
    """
    执行性能评估。

    Parameters
    ----------
    oot_grp_name : str, optional
        oot分组字段名
    min_data_size : int, default 100
        每组最小样本数
    grp_colname : str, optional
        分组结果列名
    fig_save_path : str, optional
        图片保存路径
    rpt_save_path : str, optional
        报告保存路径
    to_show : bool, default False
        是否显示图形
    display : bool, default True
        是否打印结果
    gains_table : bool, default True
        是否计算收益表
    benchmark_dataset : str or pandas.DataFrame, optional
        固定分箱边界的基准数据集。若传入str, 则从add_dataset添加的数据集中按名称获取;
        若传入DataFrame, 则直接使用该DataFrame。默认None表示各数据集独立分箱。

    Returns
    -------
    pandas.DataFrame
        性能评估汇总表。若实例的 ``tgt_name`` 为多标签 list/tuple, 则对每个标签分别评估,
        结果新增 ``tgt_name`` 列后纵向拼接; ``to_show=True`` 时每个标签各出一张图,
        ``fig_save_path`` 自动按标签加后缀 (如 ``perf.png`` → ``perf_<label>.png``)。
    """
    if len(self.datasets) == 0:
        return pd.DataFrame([])

    if self.scr_name is None and self.model is None and self.feature_cols is None:
        return -1

    if self.scr_name is None and self.model is None:
        return -2

    if self.scr_name is None and self.feature_cols is None:
        return -3

    active_weight_col = weight_col or self.weight_col
    has_dataset_weight = any(v is not None for v in self.dataset_weight_cols.values())
    if (active_weight_col is not None or has_dataset_weight) and oot_grp_name is None and benchmark_dataset is None and not isinstance(self.tgt_name, (list, tuple)):
        rows = []
        for name, data in self.datasets.items():
            if data is None:
                continue
            wc = self.dataset_weight_cols.get(name) or active_weight_col
            work_data = data.copy()
            scr_name = self.scr_name
            if scr_name is None:
                scr_name = "_mdl_scr"
                work_data[scr_name] = self.model.predict_proba(work_data.loc[:, self.feature_cols])[:, 1]
            rows.append(
                _weighted_eval.dataset_summary(
                    name,
                    work_data,
                    self.tgt_name,
                    scr_name,
                    weight_col=wc,
                    nbins=self.pct_bins,
                )
            )
        fnl_df = pd.DataFrame(rows)
        if display:
            from IPython.display import display as _ipy_display
            _ipy_display(fnl_df)
        if fig_save_path:
            eval_datasets = {}
            for name, data in self.datasets.items():
                if data is None:
                    continue
                work_data = data.copy()
                scr_name = self.scr_name
                if scr_name is None:
                    scr_name = "_mdl_scr"
                    work_data[scr_name] = self.model.predict_proba(work_data.loc[:, self.feature_cols])[:, 1]
                eval_datasets[name] = {
                    "y_true": work_data[self.tgt_name],
                    "y_score": work_data[scr_name],
                }
            if eval_datasets:
                evaluate_performance(
                    datasets=eval_datasets,
                    dist_bins=self.dist_bins,
                    pct_bins=self.pct_bins,
                    square_figsize=5,
                    to_show=to_show,
                    save_path=fig_save_path,
                    gains_table=gains_table,
                    equal_freq=self.equal_freq,
                )
        if rpt_save_path:
            fnl_df.to_csv(rpt_save_path, index=False)
        return fnl_df

    # ── 多 y 标签支持: tgt_name 为 list/tuple 时, 逐标签评估后纵向拼接 ──
    #    表格: 每个标签结果新增 tgt_name 列后纵向拼接;
    #    图片: 每个标签各自出图 (to_show=True 时循环显示, fig_save_path 自动按标签加后缀)。
    if isinstance(self.tgt_name, (list, tuple)):
        import os as _os

        def _suffix_path(p, lab):
            if not p:
                return None
            root, ext = _os.path.splitext(str(p))
            return "{0}_{1}{2}".format(root, lab, ext)

        _orig_tgt = self.tgt_name
        multi_results = []
        try:
            for _t in list(self.tgt_name):
                self.tgt_name = _t
                sub_df = self.evaluate(
                    oot_grp_name = oot_grp_name,
                    min_data_size = min_data_size,
                    grp_colname = grp_colname,
                    fig_save_path = _suffix_path(fig_save_path, _t),
                    rpt_save_path = None,        # 拼接后统一保存 (见下)
                    to_show = to_show,           # 每个标签各自出图
                    display = False,             # 拼接后统一展示
                    gains_table = gains_table,
                    benchmark_dataset = benchmark_dataset,
                )
                if isinstance(sub_df, pd.DataFrame) and sub_df.shape[0] > 0:
                    sub_df.insert(0, "tgt_name", _t)
                    multi_results.append(sub_df)
        finally:
            self.tgt_name = _orig_tgt

        fnl_df = pd.concat(multi_results, ignore_index = True) if multi_results else pd.DataFrame([])

        if display:
            from IPython.display import display as _ipy_display
            _ipy_display(fnl_df)
        if rpt_save_path:
            fnl_df.to_csv(rpt_save_path, index = False)
        return fnl_df

    def _get_score(data):
        if self.scr_name is not None:
            return data[self.scr_name]
        return self.model.predict_proba(data.loc[:, self.feature_cols])[:, 1]

    def _get_benchmark_bin_edges():
        if benchmark_dataset is None:
            return None

        if isinstance(benchmark_dataset, str):
            if benchmark_dataset not in self.datasets:
                raise ValueError("benchmark_dataset must be one of added datasets: {0}".format(list(self.datasets.keys())))
            benchmark_data = self.datasets[benchmark_dataset]
        else:
            benchmark_data = benchmark_dataset

        benchmark_score = "_benchmark_score"
        benchmark_df = pd.DataFrame({
            self.tgt_name: benchmark_data[self.tgt_name],
            benchmark_score: _get_score(benchmark_data)
        })

        _, benchmark_bin_edges = super_binning(
            data = benchmark_df,
            score = benchmark_score,
            dep = self.tgt_name,
            nbins = self.pct_bins,
            precision = self.precision,
            min_bin_prop = self.min_bin_prop,
            include_missing = self.include_missing,
            equal_freq = self.equal_freq,
            chi2_method = self.chi2_method,
            chi2_p = self.chi2_p,
            init_equi_bins = self.init_equi_bins,
            tree_binning = self.tree_binning,
            random_state = self.random_state,
            return_edges = True,
            ascending = True
        )

        if benchmark_bin_edges is None or len(benchmark_bin_edges) < 2:
            raise ValueError("Cannot generate benchmark bin edges from benchmark_dataset.")

        return list(benchmark_bin_edges)

    benchmark_bin_edges = _get_benchmark_bin_edges()

    def _evaluate_dataset_dict(dataset_dict, save_path = None):
        eval_datasets = {}
        for name, data in dataset_dict.items():
            if data is None:
                continue
            eval_datasets[name] = {
                "y_true": data[self.tgt_name],
                "y_score": _get_score(data)
            }

        if len(eval_datasets) == 0:
            return pd.DataFrame([])

        model_eval_result_df = evaluate_performance(
            datasets = eval_datasets,
            dist_bins = self.dist_bins,
            pct_bins = self.pct_bins,
            square_figsize = 5,
            to_show = to_show,
            save_path = save_path,
            gains_table = gains_table,
            equal_freq = self.equal_freq,
            pct_bin_edges = benchmark_bin_edges
        )

        import re
        btm_cols = [x for x in model_eval_result_df.columns if x.startswith("Btm")]
        top_cols = [x for x in model_eval_result_df.columns if x.startswith("Top")]

        if btm_cols and top_cols:
            btm_str = btm_cols[0]
            top_str = top_cols[0]
            numbers = re.findall(r'\d+', btm_str)
            if numbers:
                quantile = int(numbers[0])
            else:
                quantile = self.pct_bins

            model_eval_result_df[f"Btm{quantile}%_Lift"] = model_eval_result_df[btm_str] / model_eval_result_df["avgTrue"]
            model_eval_result_df[f"Top{quantile}%_Lift"] = model_eval_result_df[top_str] / model_eval_result_df["avgTrue"]
            model_eval_result_df["AUC_Shift"] = model_eval_result_df["AUC"].shift(1) / model_eval_result_df["AUC"] - 1
            model_eval_result_df["KS_Shift"] = model_eval_result_df["KS"].shift(1) / model_eval_result_df["KS"] - 1

        gains_table_cols = ['N_BUMP', 'MIN_RISK_DEP', 'MAX_RISK_DEP', 'KS_IN_GAINS', 'LIFT_IN_GAINS', 'IV', 'N_BINS']
        gains_summ_list = []
        for name, data in dataset_dict.items():
            if data is None:
                gains_res = pd.DataFrame([], columns = gains_table_cols)
            else:
                gains_res = get_gains_table(
                    data = data,
                    dep = self.tgt_name,
                    nbins = benchmark_bin_edges if benchmark_bin_edges is not None else self.pct_bins,
                    precision = self.precision,
                    min_bin_prop = self.min_bin_prop,
                    include_missing = self.include_missing,
                    score = self.scr_name,
                    equal_freq = self.equal_freq,
                    chi2_method = False if benchmark_bin_edges is not None else self.chi2_method,
                    model = self.model,
                    varlist = self.feature_cols,
                    init_equi_bins = self.init_equi_bins,
                    chi2_p = self.chi2_p,
                    retSummary = True,
                    tree_binning = False if benchmark_bin_edges is not None else self.tree_binning,
                    random_state = self.random_state
                )
            gains_res['index'] = name
            gains_summ_list.append(gains_res)

        if gains_summ_list:
            gains_summ = pd.concat(gains_summ_list)
            model_eval_result_df = model_eval_result_df.merge(gains_summ, on = ['index'], how = 'left')

        return model_eval_result_df

    if oot_grp_name is None:
        fnl_df = _evaluate_dataset_dict(self.datasets, save_path = fig_save_path)
    else:
        if grp_colname is None:
            grp_colname = oot_grp_name

        grouped_results = []
        for dataset_name, data in self.datasets.items():
            if data is None or oot_grp_name not in data.columns:
                continue

            grp_list = data[oot_grp_name].sort_values(ascending = True).unique().tolist()
            valid_grp_list = [x for x in grp_list if data[data[oot_grp_name].isin([x])].shape[0] >= min_data_size]

            for grp in valid_grp_list:
                data_grp = data[data[oot_grp_name].isin([grp])]
                perf_res = _evaluate_dataset_dict({dataset_name: data_grp}, save_path = None)
                if perf_res.shape[0] > 0:
                    perf_res[grp_colname] = grp
                    grouped_results.append(perf_res)

        if len(grouped_results) == 0:
            fnl_df = pd.DataFrame([])
        else:
            fnl_df = pd.concat(grouped_results, ignore_index = True)

    if display:
        from IPython.display import display
        display(fnl_df)

    if rpt_save_path:
        fnl_df.to_csv(rpt_save_path, index = False)

    return fnl_df

get_gains_table

get_gains_table(data, dep, nbins=10, precision=5, min_bin_prop=0.05, include_missing=True, score=None, model=None, varlist=None, equal_freq=True, chi2_method=False, grp_name=None, min_data_size=100, grp_colname=None, sync_range=True, chi2_p=0.95, init_equi_bins=100, fillna=-999999, spec_values=[], retSummary=False, tree_binning=False, random_state=42, ascending=False, withSummary=False, wholeGroup=False, add_func=None, weight_col=None, weighted_binning=None)

计算分组收益表。

根据分组字段对数据进行分组,分别计算每个分组的收益表。 若未指定分组字段,则计算整体收益表。

参数:

名称 类型 描述 默认
data DataFrame

输入数据表

必需
dep str

目标变量名

必需
nbins int

分箱数量

10
precision int

边界值精度

5
min_bin_prop float

每箱最小样本占比

0.05
include_missing bool

是否包含缺失值

True
score str

分数字段名

None
model sklearn-like model

机器学习模型

None
varlist list

模型特征列表

None
equal_freq bool

True为等频分箱

True
chi2_method bool

是否使用卡方分箱

False
grp_name str

分组字段名

None
min_data_size int

每组最小样本数

100
grp_colname str

分组结果列名

None
sync_range bool

是否同步分箱边界

True
chi2_p float

卡方检验显著性水平

0.95
init_equi_bins int

初始等频分箱数量

100
fillna any

缺失值填充值

-999999
spec_values list

特殊值列表

[]
retSummary bool

是否只返回汇总指标

False
tree_binning bool

是否使用决策树分箱

False
random_state int

随机种子

42
ascending bool

分箱顺序是否升序

False
withSummary bool

是否包含总体汇总行

False
wholeGroup bool

是否使用全部数据分组

False
add_func callable

自定义统计函数

None
weight_col str

样本权重列名;提供且无 grp_name 时按权重聚合 Gains(输出 N 为权重和、N_RAW 为行数)

None
weighted_binning bool

True 时按累计权重等频分箱;False 时按行数(默认)

None

返回:

类型 描述
DataFrame

分组收益表

源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
def get_gains_table(data, dep, nbins = 10, precision = 5, min_bin_prop = 0.05, include_missing = True, 
                    score = None, model = None, varlist = None, equal_freq = True, chi2_method = False,
                    grp_name = None, min_data_size = 100, grp_colname = None, sync_range = True,
                    chi2_p = 0.95, init_equi_bins = 100, fillna = -999999, spec_values = [], retSummary = False,
                    tree_binning = False, random_state=42, ascending = False, withSummary = False, wholeGroup = False, 
                    add_func = None, weight_col = None, weighted_binning = None):
    """
    计算分组收益表。

    根据分组字段对数据进行分组,分别计算每个分组的收益表。
    若未指定分组字段,则计算整体收益表。

    Parameters
    ----------
    data : pandas.DataFrame
        输入数据表
    dep : str
        目标变量名
    nbins : int, default 10
        分箱数量
    precision : int, default 5
        边界值精度
    min_bin_prop : float, default 0.05
        每箱最小样本占比
    include_missing : bool, default True
        是否包含缺失值
    score : str, optional
        分数字段名
    model : sklearn-like model, optional
        机器学习模型
    varlist : list, optional
        模型特征列表
    equal_freq : bool, default True
        True为等频分箱
    chi2_method : bool, default False
        是否使用卡方分箱
    grp_name : str, optional
        分组字段名
    min_data_size : int, default 100
        每组最小样本数
    grp_colname : str, optional
        分组结果列名
    sync_range : bool, default True
        是否同步分箱边界
    chi2_p : float, default 0.95
        卡方检验显著性水平
    init_equi_bins : int, default 100
        初始等频分箱数量
    fillna : any, default -999999
        缺失值填充值
    spec_values : list, default []
        特殊值列表
    retSummary : bool, default False
        是否只返回汇总指标
    tree_binning : bool, default False
        是否使用决策树分箱
    random_state : int, default 42
        随机种子
    ascending : bool, default False
        分箱顺序是否升序
    withSummary : bool, default False
        是否包含总体汇总行
    wholeGroup : bool, default False
        是否使用全部数据分组
    add_func : callable, optional
        自定义统计函数
    weight_col : str, optional
        样本权重列名;提供且无 ``grp_name`` 时按权重聚合 Gains(输出 ``N`` 为权重和、``N_RAW`` 为行数)
    weighted_binning : bool, optional
        True 时按累计权重等频分箱;False 时按行数(默认)

    Returns
    -------
    pandas.DataFrame
        分组收益表
    """

    if weight_col is not None and grp_name is None:
        work_data = data.copy()
        work_score = score
        if work_score is None:
            if model is None or varlist is None:
                return _get_gains_table_single(
                    data=data,
                    dep=dep,
                    nbins=nbins,
                    precision=precision,
                    min_bin_prop=min_bin_prop,
                    include_missing=include_missing,
                    score=score,
                    model=model,
                    varlist=varlist,
                    equal_freq=equal_freq,
                    chi2_method=chi2_method,
                    chi2_p=chi2_p,
                    init_equi_bins=init_equi_bins,
                    fillna=fillna,
                    spec_values=spec_values,
                    retSummary=retSummary,
                    tree_binning=tree_binning,
                    random_state=random_state,
                    ascending=ascending,
                    withSummary=withSummary,
                    add_func=add_func,
                )
            work_score = "_mdl_scr"
            work_data[work_score] = model.predict_proba(work_data.loc[:, varlist])[:, 1]
        gains = _weighted_eval.get_gains_table(
            work_data,
            dep,
            work_score,
            nbins=nbins,
            weight_col=weight_col,
            weighted_binning=weighted_binning,
        )
        if retSummary:
            return pd.DataFrame({
                "N_BUMP": [gains["RANK_ORDER_BUMP"].sum()],
                "MIN_RISK_DEP": [gains["TRUE_BAD_SHIFT"].round(4).min()],
                "MAX_RISK_DEP": [gains["TRUE_BAD_SHIFT"].round(4).max()],
                "KS_IN_GAINS": [gains["KS_PER_BIN"].round(4).max()],
                "LIFT_IN_GAINS": [gains["LIFT"].round(4).max()],
                "IV": [gains["IV"].replace([np.inf, -np.inf], 0).sum()],
                "N_BINS": [gains.shape[0]],
            })
        return gains

    if grp_colname is None:
        grp_colname = grp_name

    if grp_name is None:
        fnl_df = _get_gains_table_single(data = data, 
                                         dep = dep, 
                                         nbins = nbins, 
                                         precision = precision, 
                                         min_bin_prop = min_bin_prop, 
                                         include_missing = include_missing, 
                                         score = score, 
                                         model = model, 
                                         varlist = varlist, 
                                         equal_freq = equal_freq, 
                                         chi2_method = chi2_method,
                                         chi2_p = chi2_p, 
                                         init_equi_bins = init_equi_bins, 
                                         fillna = fillna, 
                                         spec_values = spec_values,
                                         retSummary = retSummary, 
                                         tree_binning = tree_binning, 
                                         random_state = random_state,
                                         ascending = ascending,
                                         withSummary = withSummary,
                                         add_func = add_func)
        return fnl_df

    withSummary = False
    grp_list = data[grp_name].sort_values(ascending = True).unique().tolist()
    valid_grp_list = [x for x in grp_list if data[data[grp_name].isin([x])].shape[0] >= min_data_size]

    if len(valid_grp_list) == 0:
        fnl_df = _get_gains_table_single(data = data, 
                                         dep = dep, 
                                         nbins = nbins, 
                                         precision = precision, 
                                         min_bin_prop = min_bin_prop, 
                                         include_missing = include_missing, 
                                         score = score, 
                                         model = model, 
                                         varlist = varlist, 
                                         equal_freq = equal_freq, 
                                         chi2_method = chi2_method,
                                         chi2_p = chi2_p, 
                                         init_equi_bins = init_equi_bins, 
                                         fillna = fillna, 
                                         spec_values = spec_values,
                                         retSummary = retSummary, 
                                         tree_binning = tree_binning, 
                                         random_state = random_state,
                                         ascending = ascending,
                                         withSummary = withSummary,
                                         add_func = add_func)
        fnl_df[grp_colname] = np.nan
        col_index = pd.MultiIndex.from_tuples([], names=['_bin_num', '_bin_range'])
        fnl_df = pd.DataFrame([], columns = fnl_df.columns, index = col_index)
        return fnl_df

    first_grp = valid_grp_list[0]
    data_grp = data[data[grp_name].isin([first_grp])]

    grp_for_binning = data_grp if not wholeGroup else data

    if sync_range and retSummary:

        full_gains = _get_gains_table_single(data = grp_for_binning, 
                                            dep = dep, 
                                            nbins = nbins, 
                                            precision = precision, 
                                            min_bin_prop = min_bin_prop, 
                                            include_missing = include_missing, 
                                            score = score, 
                                            model = model, 
                                            varlist = varlist, 
                                            equal_freq = equal_freq, 
                                            chi2_method = chi2_method,
                                            chi2_p = chi2_p, 
                                            init_equi_bins = init_equi_bins, 
                                            fillna = fillna, 
                                            spec_values = spec_values,
                                            retSummary = False, 
                                            tree_binning = tree_binning, 
                                            random_state = random_state,
                                            ascending = ascending,
                                            withSummary = withSummary,
                                            add_func = add_func)

        bin_range = get_bin_range_list(full_gains.reset_index())
        bin_range = bin_range + [-np.inf, np.inf]
        bin_range = sorted(list(set(bin_range)))

        fnl_df = _get_gains_table_single(data = data_grp, 
                                         dep = dep, 
                                         nbins = nbins, 
                                         precision = precision, 
                                         min_bin_prop = min_bin_prop, 
                                         include_missing = include_missing, 
                                         score = score, 
                                         model = model, 
                                         varlist = varlist, 
                                         equal_freq = equal_freq, 
                                         chi2_method = chi2_method,
                                         chi2_p = chi2_p, 
                                         init_equi_bins = init_equi_bins, 
                                         fillna = fillna, 
                                         spec_values = spec_values,
                                         retSummary = retSummary, 
                                         tree_binning = tree_binning, 
                                         random_state = random_state,
                                         ascending = ascending,
                                         withSummary = withSummary,
                                         add_func = add_func)

        fnl_df[grp_colname] = first_grp

        nbins = bin_range

        equal_freq = False
        chi2_method = False

    if sync_range and not retSummary:

        fnl_df = _get_gains_table_single(data = data_grp, 
                                        dep = dep, 
                                        nbins = nbins, 
                                        precision = precision, 
                                        min_bin_prop = min_bin_prop, 
                                        include_missing = include_missing, 
                                        score = score, 
                                        model = model, 
                                        varlist = varlist, 
                                        equal_freq = equal_freq, 
                                        chi2_method = chi2_method,
                                        chi2_p = chi2_p, 
                                        init_equi_bins = init_equi_bins, 
                                        fillna = fillna, 
                                        spec_values = spec_values,
                                        retSummary = retSummary, 
                                        tree_binning = tree_binning, 
                                        random_state = random_state,
                                        ascending = ascending,
                                        withSummary = withSummary,
                                        add_func = add_func)

        fnl_df[grp_colname] = first_grp

        nbins = get_bin_range_list(fnl_df.reset_index())
        nbins = nbins + [-np.inf, np.inf]
        nbins = sorted(list(set(nbins)))

        equal_freq = False
        chi2_method = False

    if (not sync_range and not retSummary) or (not sync_range and retSummary):

        fnl_df = _get_gains_table_single(data = data_grp, 
                                         dep = dep, 
                                         nbins = nbins, 
                                         precision = precision, 
                                         min_bin_prop = min_bin_prop, 
                                         include_missing = include_missing, 
                                         score = score, 
                                         model = model, 
                                         varlist = varlist, 
                                         equal_freq = equal_freq, 
                                         chi2_method = chi2_method,
                                         chi2_p = chi2_p, 
                                         init_equi_bins = init_equi_bins, 
                                         fillna = fillna, 
                                         spec_values = spec_values,
                                         retSummary = retSummary, 
                                         tree_binning = tree_binning, 
                                         random_state = random_state,
                                         ascending = ascending,
                                         withSummary = withSummary,
                                         add_func = add_func)

        fnl_df[grp_colname] = first_grp

    i = 1
    while i < len(valid_grp_list):

        grp = grp_list[i]
        data_grp = data[data[grp_name].isin([grp])]

        perf_res = _get_gains_table_single(data = data_grp, 
                                           dep = dep, 
                                           nbins = nbins, 
                                           precision = precision, 
                                           min_bin_prop = min_bin_prop, 
                                           include_missing = include_missing, 
                                           score = score, 
                                           model = model, 
                                           varlist = varlist, 
                                           equal_freq = equal_freq, 
                                           chi2_method = chi2_method,
                                           chi2_p = chi2_p, 
                                           init_equi_bins = init_equi_bins, 
                                           fillna = fillna, 
                                           spec_values = spec_values,
                                           retSummary = retSummary, 
                                           tree_binning = tree_binning, 
                                           random_state = random_state,
                                           ascending = ascending,
                                           withSummary = withSummary,
                                           add_func = add_func)

        perf_res[grp_colname] = grp

        fnl_df = pd.concat([fnl_df, perf_res])
        i += 1

    return fnl_df

get_perf_summary

get_perf_summary(train, validation, oot, tgt_name, scr_name=None, model=None, feature_cols=None, fig_save_path=None, rpt_save_path=None, to_show=False, display=True, dist_bins=20, pct_bins=10, precision=5, min_bin_prop=0.05, include_missing=False, equal_freq=True, chi2_method=False, init_equi_bins=1000, chi2_p=0.9, oot_grp_name=None, min_data_size=100, grp_colname=None, tree_binning=False, random_state=42, gains_table=False, weight_col=None)

计算分组性能评估汇总。

对训练集、验证集和oot样本进行分组性能评估,根据oot_grp_name 字段分组后分别计算各组的性能指标。

参数:

名称 类型 描述 默认
train DataFrame

训练数据集

必需
validation DataFrame

验证数据集

必需
oot DataFrame

oot数据集

必需
tgt_name str

目标变量名

必需
scr_name str

分数字段名

None
model sklearn-like model

机器学习模型

None
feature_cols list

模型特征列表

None
fig_save_path str

图片保存路径

None
rpt_save_path str

报告保存路径

None
to_show bool

是否显示图形

False
display bool

是否打印结果

True
dist_bins int

分布分箱数

20
pct_bins int

百分比分箱数

10
precision int

边界值精度

5
min_bin_prop float

每箱最小样本占比

0.05
include_missing bool

是否包含缺失值

False
equal_freq bool

True为等频分箱

True
chi2_method bool

是否使用卡方分箱

False
init_equi_bins int

初始等频分箱数量

1000
chi2_p float

卡方检验显著性水平

0.9
oot_grp_name str

oot分组字段名

None
min_data_size int

每组最小样本数

100
grp_colname str

分组结果列名

None
tree_binning bool

是否使用决策树分箱

False
random_state int

随机种子

42
gains_table bool

是否计算收益表

True
weight_col str

各数据集 DataFrame 中的样本权重列;无分组时用于加权 AUC/KS 等指标

None

返回:

类型 描述
DataFrame

分组性能评估汇总表

源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
def get_perf_summary(train, validation, oot, tgt_name, 
                     scr_name = None,
                     model = None, 
                     feature_cols = None, 
                     fig_save_path = None, 
                     rpt_save_path = None,
                     to_show = False, 
                     display = True,
                     dist_bins = 20, 
                     pct_bins = 10, 
                     precision = 5, 
                     min_bin_prop = 0.05,
                     include_missing = False, 
                     equal_freq = True,
                     chi2_method = False, 
                     init_equi_bins = 1000, 
                     chi2_p = 0.9,
                     oot_grp_name = None, 
                     min_data_size = 100, 
                     grp_colname = None,
                     tree_binning = False, 
                     random_state = 42,
                     gains_table = False,
                     weight_col = None):
    """
    计算分组性能评估汇总。

    对训练集、验证集和oot样本进行分组性能评估,根据oot_grp_name
    字段分组后分别计算各组的性能指标。

    Parameters
    ----------
    train : pandas.DataFrame, optional
        训练数据集
    validation : pandas.DataFrame, optional
        验证数据集
    oot : pandas.DataFrame, optional
        oot数据集
    tgt_name : str
        目标变量名
    scr_name : str, optional
        分数字段名
    model : sklearn-like model, optional
        机器学习模型
    feature_cols : list, optional
        模型特征列表
    fig_save_path : str, optional
        图片保存路径
    rpt_save_path : str, optional
        报告保存路径
    to_show : bool, default False
        是否显示图形
    display : bool, default True
        是否打印结果
    dist_bins : int, default 20
        分布分箱数
    pct_bins : int, default 10
        百分比分箱数
    precision : int, default 5
        边界值精度
    min_bin_prop : float, default 0.05
        每箱最小样本占比
    include_missing : bool, default False
        是否包含缺失值
    equal_freq : bool, default True
        True为等频分箱
    chi2_method : bool, default False
        是否使用卡方分箱
    init_equi_bins : int, default 1000
        初始等频分箱数量
    chi2_p : float, default 0.9
        卡方检验显著性水平
    oot_grp_name : str, optional
        oot分组字段名
    min_data_size : int, default 100
        每组最小样本数
    grp_colname : str, optional
        分组结果列名
    tree_binning : bool, default False
        是否使用决策树分箱
    random_state : int, default 42
        随机种子
    gains_table : bool, default True
        是否计算收益表
    weight_col : str, optional
        各数据集 DataFrame 中的样本权重列;无分组时用于加权 AUC/KS 等指标

    Returns
    -------
    pandas.DataFrame
        分组性能评估汇总表
    """

    if weight_col is not None and oot_grp_name is None:
        return _weighted_eval.get_perf_summary(
            train=train,
            validation=validation,
            oot=oot,
            tgt_name=tgt_name,
            scr_name=scr_name,
            weight_col=weight_col,
            nbins=pct_bins,
        )

    if grp_colname is None:
        grp_colname = oot_grp_name

    if oot_grp_name is None:
        fnl_df = _get_perf_summary_single(train = train, 
                                          validation = validation, 
                                          oot = oot, 
                                          scr_name = scr_name,
                                          model = model, 
                                          tgt_name = tgt_name, 
                                          display=display,
                                          feature_cols=feature_cols, 
                                          to_show=to_show,
                                          fig_save_path=fig_save_path,
                                          rpt_save_path=rpt_save_path,
                                          dist_bins=dist_bins, 
                                          pct_bins=pct_bins,
                                          precision = precision, 
                                          min_bin_prop = min_bin_prop,
                                          include_missing = include_missing, 
                                          equal_freq = equal_freq,
                                          chi2_method = chi2_method, 
                                          init_equi_bins = init_equi_bins, 
                                          chi2_p = chi2_p, 
                                          tree_binning = tree_binning,
                                          random_state = random_state,
                                          gains_table = gains_table)
        return fnl_df

    grp_list = oot[oot_grp_name].sort_values(ascending = True).unique().tolist()
    valid_grp_list = [x for x in grp_list if oot[oot[oot_grp_name].isin([x])].shape[0] >= min_data_size]

    if len(valid_grp_list) == 0:
        return pd.DataFrame([])

    first_grp = valid_grp_list[0]
    oot_grp = oot[oot[oot_grp_name].isin([first_grp])]

    fnl_df = _get_perf_summary_single(train = train, 
                                      validation = validation, 
                                      oot = oot_grp, 
                                      scr_name = scr_name,
                                      model = model, 
                                      tgt_name = tgt_name, 
                                      display=display,
                                      feature_cols=feature_cols, 
                                      to_show=to_show,
                                      fig_save_path=fig_save_path,
                                      rpt_save_path=rpt_save_path,
                                      dist_bins=dist_bins, 
                                      pct_bins=pct_bins,
                                      precision = precision, 
                                      min_bin_prop = min_bin_prop,
                                      include_missing = include_missing, 
                                      equal_freq = equal_freq,
                                      chi2_method = chi2_method, 
                                      init_equi_bins = init_equi_bins, 
                                      chi2_p = chi2_p,
                                      tree_binning = tree_binning,
                                      random_state = random_state,
                                      gains_table = gains_table)
    fnl_df[grp_colname] = first_grp

    i = 1
    while i < len(valid_grp_list):

        grp = grp_list[i]
        oot_grp = oot[oot[oot_grp_name].isin([grp])]

#         print(grp)

        perf_res = _get_perf_summary_single(train = train, 
                                           validation = validation, 
                                           oot = oot_grp, 
                                           scr_name = scr_name,
                                           model = model, 
                                           tgt_name = tgt_name, 
                                           display=display,
                                           feature_cols=feature_cols, 
                                           to_show=to_show,
                                           fig_save_path=fig_save_path,
                                           rpt_save_path=rpt_save_path,
                                           dist_bins=dist_bins, 
                                           pct_bins=pct_bins,
                                           precision = precision, 
                                           min_bin_prop = min_bin_prop,
                                           include_missing = include_missing, 
                                           equal_freq = equal_freq,
                                           chi2_method = chi2_method, 
                                           init_equi_bins = init_equi_bins, 
                                           chi2_p = chi2_p,
                                           tree_binning = tree_binning,
                                           random_state = random_state,
                                           gains_table = gains_table)
        perf_res[grp_colname] = grp

        fnl_df = pd.concat([fnl_df, perf_res])
        i += 1

    return fnl_df

cross_risk

cross_risk(data, score_list, dep, nbins, agg_col=None, precision=5, min_bin_prop=0.05, include_missing=False, equal_freq=True, binning_numeric=[True, True], agg_func='mean', chi2_method=False, chi2_p=0.95, init_equi_bins=100, fillna=-999999, spec_values=[], tree_binning=False, random_state=42, weight_col=None, sample_weight=None, wgt_col=None)

创建交叉风险表。

对两个分数字段进行分箱后,计算交叉分组的风险聚合值。 支持对数值型变量进行自动分箱。

参数:

名称 类型 描述 默认
data DataFrame

输入数据表

必需
score_list list

分数字段列表,长度为2

必需
dep str

目标变量名

必需
nbins int or list

分箱数量(整数或长度为2的列表)

必需
agg_col str

聚合列名,默认为dep

None
precision int or list

边界值精度

5
min_bin_prop float or list

每箱最小样本占比

0.05
include_missing bool

是否包含缺失值

False
equal_freq bool

True为等频分箱

True
binning_numeric list

是否对数值型字段分箱

[True, True]
agg_func (str, callable, tuple or dict)

聚合函数。

常规用法与 pandas.crosstabaggfunc 一致,例如 'mean''sum''count' 或自定义函数。

也支持直接计算两个字段聚合后的比值:

  1. 简写形式: agg_col=(numerator_col, denominator_col), agg_func='ratio'
  2. tuple形式: agg_func=('ratio', numerator_col, denominator_col)
  3. dict形式: agg_func={ 'func': 'ratio', 'numerator': numerator_col, 'denominator': denominator_col, 'numerator_agg': 'sum', 'denominator_agg': 'sum', 'return_count': True, 'return_count_pct': True, 'count_name': 'N', 'count_pct_name': 'N_Pct', 'ratio_name': 'ratio', 'valid_only': True }

return_count=True 时,返回结果的 columns 会增加一层指标名, 包含 ratio 矩阵和 N 矩阵;当 return_count_pct=True 时, 额外返回每个格子的 N 占比矩阵,计算方式为: 当前格子 count / 右下角 Total count。

'mean'
chi2_method bool

是否使用卡方分箱

False
chi2_p float

卡方检验显著性水平

0.95
init_equi_bins int

初始等频分箱数量

100
fillna any

缺失值填充值

-999999
spec_values list

特殊值列表

[]
tree_binning bool

是否使用决策树分箱

False
random_state int

随机种子

42
weight_col str

Column in data with per-row sample weights (alias: wgt_col).

None
sample_weight array - like

Explicit per-row weights; must align with len(data).

None
wgt_col str

Alias for weight_col.

None

返回:

类型 描述
DataFrame

交叉风险表

示例:

>>> cross_risk(data, score_list=['score1', 'score2'], dep='target', nbins=10)
源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
def cross_risk(data, score_list, dep, nbins, agg_col = None, precision = 5, min_bin_prop = 0.05, include_missing = False, 
               equal_freq = True, binning_numeric = [True, True], agg_func = 'mean', chi2_method = False,
               chi2_p = 0.95, init_equi_bins = 100, fillna = -999999, spec_values = [], 
               tree_binning = False, random_state = 42, weight_col = None, sample_weight = None, wgt_col = None):
    """
    创建交叉风险表。

    对两个分数字段进行分箱后,计算交叉分组的风险聚合值。
    支持对数值型变量进行自动分箱。

    Parameters
    ----------
    data : pandas.DataFrame
        输入数据表
    score_list : list
        分数字段列表,长度为2
    dep : str
        目标变量名
    nbins : int or list
        分箱数量(整数或长度为2的列表)
    agg_col : str, optional
        聚合列名,默认为dep
    precision : int or list, default 5
        边界值精度
    min_bin_prop : float or list, default 0.05
        每箱最小样本占比
    include_missing : bool, default False
        是否包含缺失值
    equal_freq : bool, default True
        True为等频分箱
    binning_numeric : list, default [True, True]
        是否对数值型字段分箱
    agg_func : str, callable, tuple or dict, default 'mean'
        聚合函数。

        常规用法与 ``pandas.crosstab`` 的 ``aggfunc`` 一致,例如 ``'mean'``、
        ``'sum'``、``'count'`` 或自定义函数。

        也支持直接计算两个字段聚合后的比值:

        1. 简写形式:
           ``agg_col=(numerator_col, denominator_col), agg_func='ratio'``
        2. tuple形式:
           ``agg_func=('ratio', numerator_col, denominator_col)``
        3. dict形式:
           ``agg_func={
               'func': 'ratio',
               'numerator': numerator_col,
               'denominator': denominator_col,
               'numerator_agg': 'sum',
               'denominator_agg': 'sum',
               'return_count': True,
               'return_count_pct': True,
               'count_name': 'N',
               'count_pct_name': 'N_Pct',
               'ratio_name': 'ratio',
               'valid_only': True
           }``

        当 ``return_count=True`` 时,返回结果的 columns 会增加一层指标名,
        包含 ratio 矩阵和 N 矩阵;当 ``return_count_pct=True`` 时,
        额外返回每个格子的 N 占比矩阵,计算方式为:
        当前格子 count / 右下角 Total count。
    chi2_method : bool, default False
        是否使用卡方分箱
    chi2_p : float, default 0.95
        卡方检验显著性水平
    init_equi_bins : int, default 100
        初始等频分箱数量
    fillna : any, default -999999
        缺失值填充值
    spec_values : list, default []
        特殊值列表
    tree_binning : bool, default False
        是否使用决策树分箱
    random_state : int, default 42
        随机种子
    weight_col : str, optional
        Column in ``data`` with per-row sample weights (alias: ``wgt_col``).
    sample_weight : array-like, optional
        Explicit per-row weights; must align with ``len(data)``.
    wgt_col : str, optional
        Alias for ``weight_col``.

    Returns
    -------
    pandas.DataFrame
        交叉风险表

    Examples
    --------
    >>> cross_risk(data, score_list=['score1', 'score2'], dep='target', nbins=10)
    """

    from pandas.api.types import is_numeric_dtype

    if agg_col is None:
        agg_col = dep

    resolved_weight = None
    if weight_col is not None or sample_weight is not None or wgt_col is not None:
        resolved_weight = _weighted_eval.resolve_weights(
            data=data,
            weight_col=weight_col or wgt_col,
            sample_weight=sample_weight,
            expected_len=len(data),
        )

    def _parse_ratio_agg(agg_col, agg_func):
        """
        Parse ratio aggregation syntax while keeping backward compatibility
        with the original cross_risk agg_func behavior.
        """
        ratio_cfg = None

        if isinstance(agg_func, str) and agg_func.lower() in ["ratio", "rate", "divide"]:
            if not isinstance(agg_col, (list, tuple)) or len(agg_col) != 2:
                raise ValueError(
                    "When agg_func='ratio', `agg_col` must be a two-element "
                    "list/tuple: (numerator_col, denominator_col)."
                )
            ratio_cfg = {
                "numerator": agg_col[0],
                "denominator": agg_col[1],
            }

        elif isinstance(agg_func, (list, tuple)) and len(agg_func) >= 3:
            if str(agg_func[0]).lower() in ["ratio", "rate", "divide"]:
                ratio_cfg = {
                    "numerator": agg_func[1],
                    "denominator": agg_func[2],
                }
                if len(agg_func) >= 4:
                    ratio_cfg["return_count"] = bool(agg_func[3])

        elif isinstance(agg_func, dict):
            func_name = str(
                agg_func.get("func", agg_func.get("type", agg_func.get("agg_func", "")))
            ).lower()
            if func_name in ["ratio", "rate", "divide"]:
                ratio_cfg = dict(agg_func)
                ratio_cfg["numerator"] = ratio_cfg.get(
                    "numerator", ratio_cfg.get("num", ratio_cfg.get("numerator_col"))
                )
                ratio_cfg["denominator"] = ratio_cfg.get(
                    "denominator", ratio_cfg.get("denom", ratio_cfg.get("denominator_col"))
                )

        if ratio_cfg is None:
            return None

        if ratio_cfg.get("numerator") is None or ratio_cfg.get("denominator") is None:
            raise ValueError(
                "Ratio aggregation requires both numerator and denominator columns. "
                "Use agg_func={'func': 'ratio', 'numerator': ..., 'denominator': ...}."
            )

        ratio_cfg.setdefault("numerator_agg", "sum")
        ratio_cfg.setdefault("denominator_agg", "sum")
        ratio_cfg.setdefault("return_count", False)
        ratio_cfg.setdefault("return_count_pct", False)
        ratio_cfg.setdefault("count_name", "N")
        ratio_cfg.setdefault("count_pct_name", "N_Pct")
        ratio_cfg.setdefault("ratio_name", f"{ratio_cfg['numerator']}/{ratio_cfg['denominator']}")
        ratio_cfg.setdefault("valid_only", True)
        ratio_cfg.setdefault("zero_division", np.nan)
        return ratio_cfg

    def _parse_regular_agg(agg_func):
        """
        Parse the extended regular aggregation syntax:
        agg_func={
            'func': ['mean', 'count'],
            'return_count_pct': True,
            'count_func_name': 'count',
            'count_pct_name': 'N_Pct'
        }
        """
        if not isinstance(agg_func, dict):
            return None

        func_name = str(
            agg_func.get("func", agg_func.get("type", agg_func.get("agg_func", "")))
        ).lower()
        if func_name in ["ratio", "rate", "divide"]:
            return None

        if "func" not in agg_func and "agg_func" not in agg_func:
            return None

        regular_cfg = dict(agg_func)
        regular_cfg["func"] = regular_cfg.get("func", regular_cfg.get("agg_func"))
        regular_cfg.setdefault("return_count_pct", False)
        regular_cfg.setdefault("count_func_name", "count")
        regular_cfg.setdefault("count_pct_name", "N_Pct")
        regular_cfg.setdefault("margins_name", "Total_Avg_Risk")
        return regular_cfg

    ratio_cfg = _parse_ratio_agg(agg_col, agg_func)
    regular_cfg = _parse_regular_agg(agg_func) if ratio_cfg is None else None

    if is_numeric_dtype(data[score_list[0]]) and binning_numeric[0]:

        data, edges1 = super_binning(data = data, 
                                     score = score_list[0], 
                                     dep = dep, 
                                     nbins = nbins[0] if isinstance(nbins, list) else nbins, 
                                     precision = precision[0] if isinstance(precision, list) else precision, 
                                     min_bin_prop = min_bin_prop[0] if isinstance(min_bin_prop, list) else min_bin_prop, 
                                     include_missing = include_missing, 
                                     equal_freq = equal_freq, 
                                     chi2_method = chi2_method, 
                                     chi2_p = chi2_p, 
                                     init_equi_bins = init_equi_bins, 
                                     fillna = fillna, 
                                     spec_values = spec_values, 
                                     tree_binning = tree_binning, 
                                     random_state = random_state, 
                                     return_edges = True, 
                                     bin_colnames = ("_bin_num1", "_bin_range1"),
                                     ascending = True)

    else:
        data["_bin_num1"] = data[score_list[0]]
        data["_bin_range1"] = data[score_list[0]]

    if is_numeric_dtype(data[score_list[1]]) and binning_numeric[1]:

        data, edges2 = super_binning(data = data, 
                                     score = score_list[1], 
                                     dep = dep, 
                                     nbins = nbins[1] if isinstance(nbins, list) else nbins, 
                                     precision = precision[1] if isinstance(precision, list) else precision, 
                                     min_bin_prop = min_bin_prop[1] if isinstance(min_bin_prop, list) else min_bin_prop, 
                                     include_missing = include_missing, 
                                     equal_freq = equal_freq, 
                                     chi2_method = chi2_method, 
                                     chi2_p = chi2_p, 
                                     init_equi_bins = init_equi_bins, 
                                     fillna = fillna, 
                                     spec_values = spec_values, 
                                     tree_binning = tree_binning, 
                                     random_state = random_state, 
                                     return_edges = True, 
                                     bin_colnames = ("_bin_num2", "_bin_range2"),
                                     ascending = True)

    else:

        data["_bin_num2"] = data[score_list[1]]
        data["_bin_range2"] = data[score_list[1]]


    if ratio_cfg is not None:
        numerator = ratio_cfg["numerator"]
        denominator = ratio_cfg["denominator"]
        numerator_agg = ratio_cfg["numerator_agg"]
        denominator_agg = ratio_cfg["denominator_agg"]
        return_count = ratio_cfg["return_count"]
        return_count_pct = ratio_cfg["return_count_pct"]
        count_name = ratio_cfg["count_name"]
        count_pct_name = ratio_cfg["count_pct_name"]
        ratio_name = ratio_cfg["ratio_name"]
        valid_only = ratio_cfg["valid_only"]
        zero_division = ratio_cfg["zero_division"]
        margin_name = ratio_cfg.get("margins_name", "Total_Avg_Risk")

        missing_cols = [c for c in [numerator, denominator] if c not in data.columns]
        if len(missing_cols) > 0:
            raise ValueError(f"Columns not found in data for ratio aggregation: {missing_cols}")

        agg_data = data.copy()
        if valid_only:
            agg_data = agg_data.loc[
                agg_data[numerator].notna()
                & agg_data[denominator].notna()
                & (agg_data[denominator] != 0)
            ].copy()

        numerator_res = pd.crosstab([agg_data['_bin_num1'], agg_data['_bin_range1']],
                                    [agg_data['_bin_num2'], agg_data['_bin_range2']],
                                    rownames=[score_list[0], score_list[0]],
                                    colnames=[score_list[1], score_list[1]],
                                    values=agg_data[numerator],
                                    aggfunc=numerator_agg,
                                    margins=True,
                                    margins_name=margin_name)

        denominator_res = pd.crosstab([agg_data['_bin_num1'], agg_data['_bin_range1']],
                                      [agg_data['_bin_num2'], agg_data['_bin_range2']],
                                      rownames=[score_list[0], score_list[0]],
                                      colnames=[score_list[1], score_list[1]],
                                      values=agg_data[denominator],
                                      aggfunc=denominator_agg,
                                      margins=True,
                                      margins_name=margin_name)

        res = numerator_res / denominator_res.replace(0, np.nan)
        if not pd.isna(zero_division):
            res = res.fillna(zero_division)

        if return_count or return_count_pct:
            count_col = ratio_cfg.get("count_col", numerator)
            count_res = pd.crosstab([agg_data['_bin_num1'], agg_data['_bin_range1']],
                                    [agg_data['_bin_num2'], agg_data['_bin_range2']],
                                    rownames=[score_list[0], score_list[0]],
                                    colnames=[score_list[1], score_list[1]],
                                    values=agg_data[count_col],
                                    aggfunc="count",
                                    margins=True,
                                    margins_name=margin_name)
            output_dict = {ratio_name: res}

            if return_count:
                output_dict[count_name] = count_res

            if return_count_pct:
                total_count = count_res.iloc[-1, -1] if count_res.shape[0] > 0 and count_res.shape[1] > 0 else 0
                if total_count == 0 or pd.isna(total_count):
                    count_pct_res = count_res * np.nan
                else:
                    count_pct_res = count_res / total_count
                output_dict[count_pct_name] = count_pct_res

            res = pd.concat(output_dict, axis=1)

        return res

    actual_agg_func = regular_cfg["func"] if regular_cfg is not None else agg_func
    margin_name = regular_cfg.get("margins_name", "Total_Avg_Risk") if regular_cfg is not None else "Total_Avg_Risk"

    if resolved_weight is not None:
        if ratio_cfg is not None or regular_cfg is not None:
            raise ValueError(
                "weighted cross_risk currently supports only simple mean aggregation "
                "(agg_func='mean' without extended ratio/regular dict syntax)."
            )
        agg_name = getattr(actual_agg_func, "__name__", str(actual_agg_func)).lower()
        if agg_name != "mean":
            raise ValueError(
                "weighted cross_risk currently supports only agg_func='mean'; got {0!r}".format(actual_agg_func)
            )
        return _weighted_eval.cross_risk_weighted_mean(
            data=data,
            agg_col=agg_col,
            sample_weight=resolved_weight,
            score_list=score_list,
            margin_name=margin_name,
        )

    res = pd.crosstab([data['_bin_num1'], data['_bin_range1']], 
                        [data['_bin_num2'], data['_bin_range2']], 
                        rownames=[score_list[0], score_list[0]], 
                        colnames=[score_list[1], score_list[1]], 
                        values = data[agg_col], aggfunc = actual_agg_func, 
                        margins = True, margins_name=margin_name)

    if regular_cfg is not None and regular_cfg.get("return_count_pct", False):
        count_func_name = regular_cfg.get("count_func_name", "count")
        count_pct_name = regular_cfg.get("count_pct_name", "N_Pct")

        count_res = None
        if isinstance(res.columns, pd.MultiIndex) and count_func_name in res.columns.get_level_values(0):
            count_res = res[count_func_name]

        if count_res is None:
            count_res = pd.crosstab([data['_bin_num1'], data['_bin_range1']],
                                    [data['_bin_num2'], data['_bin_range2']],
                                    rownames=[score_list[0], score_list[0]],
                                    colnames=[score_list[1], score_list[1]],
                                    values=data[agg_col],
                                    aggfunc="count",
                                    margins=True,
                                    margins_name=margin_name)

        total_count = count_res.iloc[-1, -1] if count_res.shape[0] > 0 and count_res.shape[1] > 0 else 0
        if total_count == 0 or pd.isna(total_count):
            count_pct_res = count_res * np.nan
        else:
            count_pct_res = count_res / total_count

        if isinstance(res.columns, pd.MultiIndex):
            res = pd.concat([res, pd.concat({count_pct_name: count_pct_res}, axis=1)], axis=1)
        else:
            res = pd.concat({regular_cfg.get("func_name", str(actual_agg_func)): res,
                             count_pct_name: count_pct_res}, axis=1)

    return res

get_gains_table_by_cust_metrics

get_gains_table_by_cust_metrics(data, dep, nbins=10, precision=5, min_bin_prop=0.05, include_missing=True, score=None, model=None, varlist=None, equal_freq=True, chi2_method=False, grp_name=None, min_data_size=100, grp_colname=None, sync_range=True, chi2_p=0.95, init_equi_bins=100, fillna=-999999, spec_values=[], tree_binning=False, random_state=42, ascending=True, eval_metrics=['age', 'monthly_income', 'education'], metric_agg_func='mean', withSummary=False)

计算分组自定义指标收益表。

根据分组字段对数据进行分组,分别计算每个分组的自定义指标收益表。 收益表包含基础统计指标以及自定义指标的聚合值。

参数:

名称 类型 描述 默认
data DataFrame

输入数据表

必需
dep str

目标变量名

必需
nbins int

分箱数量

10
precision int

边界值精度

5
min_bin_prop float

每箱最小样本占比

0.05
include_missing bool

是否包含缺失值

True
score str

分数字段名

None
model sklearn-like model

机器学习模型

None
varlist list

模型特征列表

None
equal_freq bool

True为等频分箱

True
chi2_method bool

是否使用卡方分箱

False
grp_name str

分组字段名

None
min_data_size int

每组最小样本数

100
grp_colname str

分组结果列名

None
sync_range bool

是否同步分箱边界

True
chi2_p float

卡方检验显著性水平

0.95
init_equi_bins int

初始等频分箱数量

100
fillna any

缺失值填充值

-999999
spec_values list

特殊值列表

[]
tree_binning bool

是否使用决策树分箱

False
random_state int

随机种子

42
ascending bool

分箱顺序是否升序

True
eval_metrics list

需要统计的自定义指标列表

["age", "monthly_income", "education"]
metric_agg_func str or callable

自定义指标的聚合函数

"mean"
withSummary bool

是否包含总体汇总行

False

返回:

类型 描述
DataFrame

分组自定义指标收益表

源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
def get_gains_table_by_cust_metrics(data, dep, nbins = 10, precision = 5, min_bin_prop = 0.05, include_missing = True, 
                                    score = None, model = None, varlist = None, equal_freq = True, chi2_method = False,
                                    grp_name = None, min_data_size = 100, grp_colname = None, sync_range = True,
                                    chi2_p = 0.95, init_equi_bins = 100, fillna = -999999, spec_values = [], 
                                    tree_binning = False, random_state=42, ascending = True,
                                    eval_metrics = ["age", "monthly_income", "education"], metric_agg_func = "mean", withSummary = False):
    """
    计算分组自定义指标收益表。

    根据分组字段对数据进行分组,分别计算每个分组的自定义指标收益表。
    收益表包含基础统计指标以及自定义指标的聚合值。

    Parameters
    ----------
    data : pandas.DataFrame
        输入数据表
    dep : str
        目标变量名
    nbins : int, default 10
        分箱数量
    precision : int, default 5
        边界值精度
    min_bin_prop : float, default 0.05
        每箱最小样本占比
    include_missing : bool, default True
        是否包含缺失值
    score : str, optional
        分数字段名
    model : sklearn-like model, optional
        机器学习模型
    varlist : list, optional
        模型特征列表
    equal_freq : bool, default True
        True为等频分箱
    chi2_method : bool, default False
        是否使用卡方分箱
    grp_name : str, optional
        分组字段名
    min_data_size : int, default 100
        每组最小样本数
    grp_colname : str, optional
        分组结果列名
    sync_range : bool, default True
        是否同步分箱边界
    chi2_p : float, default 0.95
        卡方检验显著性水平
    init_equi_bins : int, default 100
        初始等频分箱数量
    fillna : any, default -999999
        缺失值填充值
    spec_values : list, default []
        特殊值列表
    tree_binning : bool, default False
        是否使用决策树分箱
    random_state : int, default 42
        随机种子
    ascending : bool, default True
        分箱顺序是否升序
    eval_metrics : list, default ["age", "monthly_income", "education"]
        需要统计的自定义指标列表
    metric_agg_func : str or callable, default "mean"
        自定义指标的聚合函数
    withSummary : bool, default False
        是否包含总体汇总行

    Returns
    -------
    pandas.DataFrame
        分组自定义指标收益表
    """

    if grp_colname is None:
        grp_colname = grp_name

    if grp_name is None:
        fnl_df = _get_cust_gains_table_single(data = data, 
                                             dep = dep, 
                                             nbins = nbins, 
                                             precision = precision, 
                                             min_bin_prop = min_bin_prop, 
                                             include_missing = include_missing, 
                                             score = score, 
                                             model = model, 
                                             varlist = varlist, 
                                             equal_freq = equal_freq, 
                                             chi2_method = chi2_method,
                                             chi2_p = chi2_p, 
                                             init_equi_bins = init_equi_bins, 
                                             fillna = fillna, 
                                             spec_values = spec_values,
                                             eval_metrics = eval_metrics, 
                                             tree_binning = tree_binning, 
                                             random_state = random_state, 
                                             metric_agg_func = metric_agg_func,
                                             ascending = ascending,
                                             withSummary = withSummary)
        return fnl_df

    grp_list = data[grp_name].sort_values(ascending = True).unique().tolist()
    valid_grp_list = [x for x in grp_list if data[data[grp_name].isin([x])].shape[0] >= min_data_size]

    first_grp = valid_grp_list[0]
    data_grp = data[data[grp_name].isin([first_grp])]

    if sync_range:

        fnl_df = _get_cust_gains_table_single(data = data_grp, 
                                             dep = dep, 
                                             nbins = nbins, 
                                             precision = precision, 
                                             min_bin_prop = min_bin_prop, 
                                             include_missing = include_missing, 
                                             score = score, 
                                             model = model, 
                                             varlist = varlist, 
                                             equal_freq = equal_freq, 
                                             chi2_method = chi2_method,
                                             chi2_p = chi2_p, 
                                             init_equi_bins = init_equi_bins, 
                                             fillna = fillna, 
                                             spec_values = spec_values,
                                             eval_metrics = eval_metrics, 
                                             tree_binning = tree_binning, 
                                             random_state = random_state, 
                                             metric_agg_func = metric_agg_func,
                                             ascending = ascending,
                                             withSummary = withSummary)

        fnl_df[grp_colname] = first_grp

        nbins = get_bin_range_list(fnl_df.reset_index())
        equal_freq = False
        chi2_method = False

    else:

        fnl_df = _get_cust_gains_table_single(data = data_grp, 
                                             dep = dep, 
                                             nbins = nbins, 
                                             precision = precision, 
                                             min_bin_prop = min_bin_prop, 
                                             include_missing = include_missing, 
                                             score = score, 
                                             model = model, 
                                             varlist = varlist, 
                                             equal_freq = equal_freq, 
                                             chi2_method = chi2_method,
                                             chi2_p = chi2_p, 
                                             init_equi_bins = init_equi_bins, 
                                             fillna = fillna, 
                                             spec_values = spec_values,
                                             eval_metrics = eval_metrics, 
                                             tree_binning = tree_binning, 
                                             random_state = random_state, 
                                             metric_agg_func = metric_agg_func,
                                             ascending = ascending,
                                             withSummary = withSummary)

        fnl_df[grp_colname] = first_grp

    i = 1
    while i < len(valid_grp_list):

        grp = grp_list[i]
        data_grp = data[data[grp_name].isin([grp])]

        perf_res = _get_cust_gains_table_single(data = data_grp, 
                                               dep = dep, 
                                               nbins = nbins, 
                                               precision = precision, 
                                               min_bin_prop = min_bin_prop, 
                                               include_missing = include_missing, 
                                               score = score, 
                                               model = model, 
                                               varlist = varlist, 
                                               equal_freq = equal_freq, 
                                               chi2_method = chi2_method,
                                               chi2_p = chi2_p, 
                                               init_equi_bins = init_equi_bins, 
                                               fillna = fillna, 
                                               spec_values = spec_values, 
                                               eval_metrics = eval_metrics, 
                                               tree_binning = tree_binning, 
                                               random_state = random_state, 
                                               metric_agg_func = metric_agg_func,
                                               ascending = ascending,
                                               withSummary = withSummary)
        perf_res[grp_colname] = grp

        fnl_df = pd.concat([fnl_df, perf_res])
        i += 1

    return fnl_df

tie_score_rate

tie_score_rate(data, score)

计算分数重复率。

计算分数中非唯一值的比例,即存在重复的样本占比。

参数:

名称 类型 描述 默认
data DataFrame

输入数据表

必需
score str

分数字段名

必需

返回:

类型 描述
float

分数重复率(0到1之间)

示例:

>>> tie_score_rate(data, 'score')
0.15  # 表示15%的样本存在分数重复
源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
def tie_score_rate(data, score):
    """
    计算分数重复率。

    计算分数中非唯一值的比例,即存在重复的样本占比。

    Parameters
    ----------
    data : pandas.DataFrame
        输入数据表
    score : str
        分数字段名

    Returns
    -------
    float
        分数重复率(0到1之间)

    Examples
    --------
    >>> tie_score_rate(data, 'score')
    0.15  # 表示15%的样本存在分数重复
    """

    n_unique_scr = len(data[score].unique())
    unique_scr_prop = n_unique_scr / data.shape[0]
    return (1 - unique_scr_prop)

score_unique_rate

score_unique_rate(data, score)

计算分数唯一率。

计算分数中唯一值占总样本数的比例。

参数:

名称 类型 描述 默认
data DataFrame

输入数据表(此参数保留但未使用)

必需
score array - like

分数字段或数组

必需

返回:

类型 描述
float

分数唯一率(0到1之间)

示例:

>>> score_unique_rate(data, data['score'])
0.85  # 表示85%的样本分数是唯一的
源代码位于: Modeling_Tool/Eval/Model_Eval_Tool.py
def score_unique_rate(data, score):
    """
    计算分数唯一率。

    计算分数中唯一值占总样本数的比例。

    Parameters
    ----------
    data : pandas.DataFrame
        输入数据表(此参数保留但未使用)
    score : array-like
        分数字段或数组

    Returns
    -------
    float
        分数唯一率(0到1之间)

    Examples
    --------
    >>> score_unique_rate(data, data['score'])
    0.85  # 表示85%的样本分数是唯一的
    """

    return len(np.unique(score)) / len(score)

链式评估流水线 — Evaluation_Tool

Evaluation_Tool

Model Evaluation Tool - Optimized Version

This module provides classes and functions for model evaluation, performance comparison, and gains analysis. It includes utilities for calculating various metrics and generating cross-risk summaries.

Author: Matrix Agent

EvaluationPipeline

链式调用的流水线对象,支持 .group_by() 和 .subset_by() 链式添加条件, 最后通过 .apply(func) 执行。 支持 func 返回 pandas.DataFrame 或 dict(值为 DataFrame)。

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
class EvaluationPipeline:
    """
    链式调用的流水线对象,支持 .group_by() 和 .subset_by() 链式添加条件,
    最后通过 .apply(func) 执行。
    支持 func 返回 pandas.DataFrame 或 dict(值为 DataFrame)。
    """
    def __init__(self, m_eval, data=None):
        self.m_eval = m_eval
        self.original_data = m_eval.data if data is None else data
        self.steps = []
        self._expected_columns = None   # 用于 DataFrame 返回时的列名缓存
        self._expected_dict_keys = None # 用于 dict 返回时的键缓存

    def group_by(self, group_name, min_size=100, group_var_name=None):
        if group_var_name is None:
            group_var_name = group_name
        self.steps.append({
            'type': 'group',
            'group_name': group_name,
            'group_var_name': group_var_name,
            'min_size': min_size
        })
        return self

    def subset_by(self, condition_dict, name='eval_subset', min_size=100):
        self.steps.append({
            'type': 'subset',
            'condition_dict': condition_dict,
            'subset_var_name': name,
            'min_size': min_size
        })
        return self

    def apply(self, func, **kwargs):

        import inspect
        sig = inspect.signature(func)

        def _execute(step_idx, current_data, current_meta):
            if step_idx >= len(self.steps):
                old_data = self.m_eval.data
                self.m_eval.data = current_data
                try:
                    # 自动注入当前数据(如果函数接受 'current_data' 参数)
                    if 'current_data' in sig.parameters:
                        kwargs['current_data'] = current_data
                    res = func(**kwargs)
                except Exception as e:
                    logger.info(f"Error in group {current_meta}: {e}")
                    if self._expected_dict_keys is not None:
                        res = {k: pd.DataFrame() for k in self._expected_dict_keys}
                    else:
                        res = pd.DataFrame()
                finally:
                    self.m_eval.data = old_data

                # (处理 dict 和 DataFrame 的逻辑)
                if isinstance(res, dict):
                    if self._expected_dict_keys is None and res:
                        self._expected_dict_keys = list(res.keys())
                    for key, df in res.items():
                        if df is not None and not df.empty:
                            for col_name, col_val in current_meta.items():
                                df[col_name] = col_val
                        else:
                            empty_df = pd.DataFrame()
                            for col_name, col_val in current_meta.items():
                                empty_df[col_name] = [col_val]
                            res[key] = empty_df
                    return res
                else:
                    if self._expected_columns is None and res is not None and not res.empty:
                        self._expected_columns = res.columns.tolist()
                    if res is not None and not res.empty:
                        res = res.copy()
                        for key, val in current_meta.items():
                            res[key] = val
                    else:
                        if self._expected_columns is not None:
                            res = pd.DataFrame(columns=self._expected_columns)
                        else:
                            res = pd.DataFrame()
                    return res

            step = self.steps[step_idx]
            if step['type'] == 'group':
                group_name = step['group_name']
                group_var_name = step['group_var_name']
                min_size = step['min_size']
                results = []
                for gval in current_data[group_name].unique():
                    sub_data = current_data[current_data[group_name] == gval]
                    if len(sub_data) >= min_size:
                        new_meta = current_meta.copy()
                        new_meta[group_var_name] = gval
                        sub_res = _execute(step_idx + 1, sub_data, new_meta)
                        if sub_res is not None:
                            results.append(sub_res)
                if not results:
                    if self._expected_dict_keys is not None:
                        return {k: pd.DataFrame() for k in self._expected_dict_keys}
                    else:
                        return pd.DataFrame()
                # 合并结果
                if isinstance(results[0], dict):
                    merged = {k: pd.concat([r[k] for r in results], ignore_index=True) for k in results[0].keys()}
                    return merged
                else:
                    return pd.concat(results, ignore_index=True)

            elif step['type'] == 'subset':
                condition_dict = step['condition_dict']
                subset_var_name = step['subset_var_name']
                min_size = step['min_size']
                results = []
                for label, query in condition_dict.items():
                    if query == "":
                        sub_data = current_data.copy()
                    else:
                        sub_data = current_data.query(query)
                    if len(sub_data) >= min_size:
                        new_meta = current_meta.copy()
                        new_meta[subset_var_name] = label
                        sub_res = _execute(step_idx + 1, sub_data, new_meta)
                        if sub_res is not None:
                            results.append(sub_res)
                if not results:
                    if self._expected_dict_keys is not None:
                        return {k: pd.DataFrame() for k in self._expected_dict_keys}
                    else:
                        return pd.DataFrame()
                if isinstance(results[0], dict):
                    merged = {k: pd.concat([r[k] for r in results], ignore_index=True) for k in results[0].keys()}
                    return merged
                else:
                    return pd.concat(results, ignore_index=True)

        return _execute(0, self.original_data, {})

Utility_Functions

Utility functions for data processing and calculations.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
class Utility_Functions:
    """Utility functions for data processing and calculations."""

    @staticmethod
    def valid_average(x):
        """
        Calculate the average of positive values in a Series.

        This function filters out non-positive values (including zeros and negatives)
        before calculating the mean, useful for financial or count data where
        zero or negative values should not contribute to the average.

        Parameters
        ----------
        x : pandas.Series
            Input series of numeric values.

        Returns
        -------
        float
            Rounded average of positive values, or 0 if no valid values exist.

        Examples
        --------
        >>> import pandas as pd
        >>> Utility_Functions.valid_average(pd.Series([1, 2, -1, 0, 3]))
        2.0
        """
        valid_mask = x > 0
        valid_count = valid_mask.sum()
        valid_values = x[valid_mask]
        value_sum = valid_values.sum()

        if valid_count > 0:
            return round(value_sum / valid_count, 2)
        return 0

valid_average staticmethod

valid_average(x)

Calculate the average of positive values in a Series.

This function filters out non-positive values (including zeros and negatives) before calculating the mean, useful for financial or count data where zero or negative values should not contribute to the average.

参数:

名称 类型 描述 默认
x Series

Input series of numeric values.

必需

返回:

类型 描述
float

Rounded average of positive values, or 0 if no valid values exist.

示例:

>>> import pandas as pd
>>> Utility_Functions.valid_average(pd.Series([1, 2, -1, 0, 3]))
2.0
源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
@staticmethod
def valid_average(x):
    """
    Calculate the average of positive values in a Series.

    This function filters out non-positive values (including zeros and negatives)
    before calculating the mean, useful for financial or count data where
    zero or negative values should not contribute to the average.

    Parameters
    ----------
    x : pandas.Series
        Input series of numeric values.

    Returns
    -------
    float
        Rounded average of positive values, or 0 if no valid values exist.

    Examples
    --------
    >>> import pandas as pd
    >>> Utility_Functions.valid_average(pd.Series([1, 2, -1, 0, 3]))
    2.0
    """
    valid_mask = x > 0
    valid_count = valid_mask.sum()
    valid_values = x[valid_mask]
    value_sum = valid_values.sum()

    if valid_count > 0:
        return round(value_sum / valid_count, 2)
    return 0

Model_Evaluation_Tool

A comprehensive tool for model evaluation, performance comparison, and gains analysis.

This class provides methods for: - Base score calculation and correlation analysis - Model performance comparison across multiple metrics - Gains table generation and analysis - Cross-risk analysis - Multi-dimensional evaluation with subsets and groupings

参数:

名称 类型 描述 默认
data DataFrame

Input data containing features and target variables.

必需
dep str

Name of the dependent/target variable.

必需
comp_scrlist list

List of comparison score column names.

必需
model object

Trained model with predict_proba method. Default is None.

None
base_score str

Name of the base score column. Default is None.

None
min_data_size int

Minimum data size threshold. Default is 500.

500
equal_freq bool

Whether to use equal frequency binning. Default is True.

True
precision int

Decimal precision for rounding. Default is 5.

5
chi2_method bool

Whether to use chi-square method. Default is False.

False
chi2_p float

Chi-square p-value threshold. Default is 0.999.

0.999
init_equi_bins int

Initial number of equal frequency bins. Default is 500.

500
tree_binning bool

Whether to use tree-based binning. Default is False.

False
random_seed int

Random seed for reproducibility. Default is 3407.

3407
nbins int

Number of bins for analysis. Default is 10.

10
min_bin_prop float

Minimum proportion for each bin. Default is 0.05.

0.05
include_missing bool

Whether to include missing values. Default is True.

True
missing_rate_ref int or float

Reference value for missing rate. Default is -99999999.

-99999999
excel_path str

Path for Excel output. Default is None.

None

属性:

名称 类型 描述
data DataFrame

The input data (may be modified during processing).

dep str

Target variable name.

base_score str or None

Name of the base score column.

comp_scrlist list

List of comparison score names.

eval_ylabels list

Labels for y-axis evaluation.

gains_display_metric_list list

Metrics to display in gains summary.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
class Model_Evaluation_Tool:
    """
    A comprehensive tool for model evaluation, performance comparison, and gains analysis.

    This class provides methods for:
    - Base score calculation and correlation analysis
    - Model performance comparison across multiple metrics
    - Gains table generation and analysis
    - Cross-risk analysis
    - Multi-dimensional evaluation with subsets and groupings

    Parameters
    ----------
    data : pandas.DataFrame
        Input data containing features and target variables.
    dep : str
        Name of the dependent/target variable.
    comp_scrlist : list
        List of comparison score column names.
    model : object, optional
        Trained model with predict_proba method. Default is None.
    base_score : str, optional
        Name of the base score column. Default is None.
    min_data_size : int, optional
        Minimum data size threshold. Default is 500.
    equal_freq : bool, optional
        Whether to use equal frequency binning. Default is True.
    precision : int, optional
        Decimal precision for rounding. Default is 5.
    chi2_method : bool, optional
        Whether to use chi-square method. Default is False.
    chi2_p : float, optional
        Chi-square p-value threshold. Default is 0.999.
    init_equi_bins : int, optional
        Initial number of equal frequency bins. Default is 500.
    tree_binning : bool, optional
        Whether to use tree-based binning. Default is False.
    random_seed : int, optional
        Random seed for reproducibility. Default is 3407.
    nbins : int, optional
        Number of bins for analysis. Default is 10.
    min_bin_prop : float, optional
        Minimum proportion for each bin. Default is 0.05.
    include_missing : bool, optional
        Whether to include missing values. Default is True.
    missing_rate_ref : int or float, optional
        Reference value for missing rate. Default is -99999999.
    excel_path : str, optional
        Path for Excel output. Default is None.

    Attributes
    ----------
    data : pandas.DataFrame
        The input data (may be modified during processing).
    dep : str
        Target variable name.
    base_score : str or None
        Name of the base score column.
    comp_scrlist : list
        List of comparison score names.
    eval_ylabels : list
        Labels for y-axis evaluation.
    gains_display_metric_list : list
        Metrics to display in gains summary.
    """

    def __init__(
        self,
        data: pd.DataFrame,
        dep: str,
        comp_scrlist: List[str],
        model: Any = None,
        base_score: Optional[str] = None,
        min_data_size: int = 500,
        equal_freq: bool = True,
        precision: int = 5,
        chi2_method: bool = False,
        chi2_p: float = 0.999,
        init_equi_bins: int = 500,
        tree_binning: bool = False,
        random_seed: int = 3407,
        nbins: int = 10,
        min_bin_prop: float = 0.05,
        include_missing: bool = True,
        missing_rate_ref: Union[int, float] = -99999999,
        excel_path: Optional[str] = None,
        fillna=-999999,
        spec_values=None,
        cross_agg_dict: Optional[Dict] = None,
        subset_condition_dict: Optional[Dict[str, str]] = None,
        eval_ylabels: Optional[List[str]] = None,
        grp_namelist: Optional[List[str]] = None,
        gains_display_metric_list: Optional[List[str]] = None,
        weight_col: Optional[str] = None,
        positive_score_only: bool = True,
    ):
        """
        Initialize the Model_Evaluation_Tool with configuration parameters.

        Parameters
        ----------
        data : pandas.DataFrame
            Input dataset for evaluation.
        dep : str
            Name of the target/dependent variable.
        comp_scrlist : list
            List of comparison score column names.
        model : object, optional
            Trained model with predict_proba method.
        base_score : str, optional
            Base score column name.
        min_data_size : int, optional
            Minimum data size threshold.
        equal_freq : bool, optional
            Use equal frequency binning if True.
        precision : int, optional
            Decimal precision for calculations.
        chi2_method : bool, optional
            Use chi-square method if True.
        chi2_p : float, optional
            Chi-square p-value threshold.
        init_equi_bins : int, optional
            Initial equal bins count.
        tree_binning : bool, optional
            Use tree-based binning if True.
        random_seed : int, optional
            Random seed value.
        nbins : int, optional
            Number of bins.
        min_bin_prop : float, optional
            Minimum bin proportion.
        include_missing : bool, optional
            Include missing values if True.
        missing_rate_ref : int or float, optional
            Missing rate reference value.
        excel_path : str, optional
            Path for Excel output file.
        """
        self.data = data
        self.dep = dep
        self.model = model
        self.comp_scrlist = comp_scrlist
        self.base_score = base_score
        self.min_data_size = min_data_size
        self.equal_freq = equal_freq
        self.precision = precision
        self.nbins = nbins
        self.min_bin_prop = min_bin_prop
        self.include_missing = include_missing
        self.tree_binning = tree_binning
        self.seed = random_seed
        self.excel_path = excel_path
        self.missing_rate_ref = missing_rate_ref
        self.fillna = fillna
        self.spec_values = [] if spec_values is None else spec_values
        self.weight_col = weight_col
        self.positive_score_only = positive_score_only

        self.chi2_method = chi2_method
        self.chi2_p = chi2_p
        self.init_equi_bins = init_equi_bins

        self.udf = Utility_Functions()
        self.cross_agg_dict = (
            cross_agg_dict if cross_agg_dict is not None
            else _legacy_cross_agg_dict(self.udf)
        )
        self.subset_condition_dict = (
            subset_condition_dict if subset_condition_dict is not None
            else _legacy_subset_condition_dict()
        )
        self.eval_ylabels = (
            eval_ylabels if eval_ylabels is not None else list(_LEGACY_EVAL_YLABELS)
        )
        self.grp_namelist = (
            grp_namelist if grp_namelist is not None else list(_LEGACY_GRP_NAMELIST)
        )
        self.gains_display_metric_list = (
            gains_display_metric_list if gains_display_metric_list is not None
            else list(_DEFAULT_GAINS_DISPLAY_METRICS)
        )

    def _score_order(self) -> List[str]:
        scores = []
        if self.base_score is not None:
            scores.append(self.base_score)
        scores.extend(self.comp_scrlist or [])
        return scores

    def _filter_positive_scores(self, data: pd.DataFrame, scores: List[str]) -> pd.DataFrame:
        if not self.positive_score_only or not scores:
            return data
        mask = pd.Series(True, index=data.index)
        for score in scores:
            if score in data.columns:
                mask &= data[score] > 0
        return data.loc[mask]

    def _build_performance_evaluator(
        self,
        score: str,
        dist_bins: int,
        pct_bins: int,
        min_bin_prop: Optional[float] = None,
        include_missing: Optional[bool] = None,
        equal_freq: Optional[bool] = None,
    ) -> PerformanceEvaluator:
        return PerformanceEvaluator(
            tgt_name=self.dep,
            scr_name=score,
            dist_bins=dist_bins,
            pct_bins=pct_bins,
            precision=self.precision,
            min_bin_prop=self.min_bin_prop if min_bin_prop is None else min_bin_prop,
            include_missing=False if include_missing is None else include_missing,
            equal_freq=self.equal_freq if equal_freq is None else equal_freq,
            chi2_method=self.chi2_method,
            init_equi_bins=self.init_equi_bins,
            chi2_p=self.chi2_p,
            tree_binning=self.tree_binning,
            random_state=self.seed,
            weight_col=self.weight_col,
        )

    def _build_gains_calculator(
        self,
        data: pd.DataFrame,
        score: str,
        nbins: int,
        fillna,
        spec_values,
        include_missing: bool,
        ascending: bool = True,
    ) -> GainsTableCalculator:
        return GainsTableCalculator(
            data=data,
            dep=self.dep,
            score=score,
            nbins=nbins,
            precision=self.precision,
            min_bin_prop=self.min_bin_prop,
            include_missing=include_missing,
            equal_freq=self.equal_freq,
            chi2_method=self.chi2_method,
            chi2_p=self.chi2_p,
            init_equi_bins=self.init_equi_bins,
            fillna=fillna,
            spec_values=spec_values,
            tree_binning=self.tree_binning,
            random_state=self.seed,
            ascending=ascending,
            weight_col=self.weight_col,
        )

    def __init_excel_master(self):
        """
        Initialize Excel Master for report generation.

        Returns
        -------
        ExcelMaster
            Initialized ExcelMaster instance.

        Notes
        -----
        Requires ExcelMaster package to be installed.
        """
        from ExcelMaster.ExcelMaster import ExcelMaster

        em = ExcelMaster(filepath=self.excel_path, verbose=False)
        return em

    def get_base_score(self, scorename: str = '_base_model_score_', disp: bool = False):
        """
        Calculate base model scores using the provided model.

        This method uses the model's predict_proba method to generate scores
        for the input data and optionally displays correlation analysis.

        Parameters
        ----------
        scorename : str, optional
            Name for the score column. Default is '_base_model_score_'.
        disp : bool, optional
            Whether to display intermediate results. Default is False.

        Returns
        -------
        Model_Evaluation_Tool
            Returns self for method chaining.

        Raises
        ------
        AttributeError
            If model is None or lacks required attributes.
        """
        if self.model is None:
            raise AttributeError("Model must be provided to calculate base score.")

        data = self.data.copy()
        model = self.model

        inputs = model.feature_names_in_.tolist()
        data[scorename] = model.predict_proba(data[inputs])[:, 1]
        self.data = data
        self.base_score = scorename

        scrlist = [scorename] + self.comp_scrlist
        if disp:
            from IPython.display import display
            from Modeling_Tool.Feature.Distribution_Tool import proc_means_by_grp
            display(proc_means_by_grp(data, scrlist))

        tmp_data = data[data[scrlist] > 0]

        if disp:
            from IPython.display import display
            display(tmp_data[scrlist].corr())

        return self

    def get_score_correlation(
        self,
        score_list: Optional[List[str]] = None,
        method: str = 'pearson'
    ) -> pd.DataFrame:
        """
        Calculate correlation matrix between score columns.

        Parameters
        ----------
        score_list : list, optional
            List of score column names. If None, uses base_score and comp_scrlist.
        method : str, optional
            Correlation method ('pearson', 'spearman', 'kendall'). Default is 'pearson'.

        Returns
        -------
        pandas.DataFrame
            Correlation summary in long format with columns: 'base', 'compare', 'corr'.
        """
        if score_list is None:
            score_list = [self.base_score] + self.comp_scrlist

        if self.base_score is None:
            raise ValueError("Base score must be set before calculating correlation.")

        data = self.data.copy()

        score_query = " and ".join([s + " > 0" for s in score_list])
        data = data.query(score_query)

        corr_summary = data[score_list]\
            .corr(method=method)\
            .reset_index(drop=False)\
            .melt(id_vars=['index'], var_name='compare', value_name='corr')\
            .rename(columns={"index": "base"})

        return corr_summary

    def model_perf_compare(
        self,
        data: Optional[pd.DataFrame] = None,
        grp_name: Optional[str] = None,
        dist_bins: int = 100,
        pct_bins: int = 10,
        min_data_size: int = 50,
        sync_data_size: bool = True,
        min_bin_prop: Optional[float] = None,
        include_missing: Optional[bool] = None,
        equal_freq: Optional[bool] = None,
    ) -> pd.DataFrame:
        """
        Compare model performance across different scores.

        This method calculates performance metrics (AUC, KS, etc.) for the base score
        and all comparison scores via ``PerformanceEvaluator``, optionally filtering
        by group.

        Parameters
        ----------
        data : pandas.DataFrame, optional
            Data to use. If None, uses self.data.
        grp_name : str, optional
            Group name column for stratification.
        dist_bins : int, optional
            Number of distribution bins. Default is 100.
        pct_bins : int, optional
            Number of percentile bins. Default is 10.
        min_data_size : int, optional
            Minimum data size per bin. Default is 50.
        sync_data_size : bool, optional
            Whether to filter out zero/negative scores. Default is True.
        min_bin_prop : float, optional
            Minimum bin proportion. Defaults to the instance setting.
        include_missing : bool, optional
            Whether to include missing values in performance binning.
            Defaults to ``False`` for performance comparison.
        equal_freq : bool, optional
            Use equal-frequency binning. Defaults to the instance setting.

        Returns
        -------
        pandas.DataFrame
            Performance comparison results sorted by score order.
        """
        data = self.data.copy() if data is None else data.copy()
        score_list = self.comp_scrlist
        dep = self.dep
        base_score = self.base_score
        sample_name = 'oot'

        if base_score is None:
            return pd.DataFrame()

        valid_mask = (
            data[dep].notna()
            & data[base_score].notna()
            & np.isfinite(data[base_score])
        )
        clean_data = data.loc[valid_mask]
        if clean_data.empty:
            return pd.DataFrame()

        shared_data = clean_data.copy()
        if sync_data_size and score_list:
            shared_data = self._filter_positive_scores(shared_data, score_list)

        perf_comp_dict = {}
        score_order = self._score_order()

        for score in score_order:
            if score not in clean_data.columns:
                continue
            score_data = shared_data if sync_data_size and score in (score_list or []) else clean_data
            score_data = score_data.loc[score_data[score] > 0] if self.positive_score_only else score_data
            if score_data.empty:
                continue

            evaluator = self._build_performance_evaluator(
                score=score,
                dist_bins=dist_bins,
                pct_bins=pct_bins,
                min_bin_prop=min_bin_prop,
                include_missing=include_missing,
                equal_freq=equal_freq,
            )
            perf = evaluator.add_dataset(sample_name, score_data).evaluate(
                oot_grp_name=grp_name,
                min_data_size=min_data_size,
                to_show=False,
                display=False,
            )
            if isinstance(perf, pd.DataFrame) and not perf.empty and 'index' in perf.columns:
                perf = perf.query(f"index == '{sample_name}'")
            perf_comp_dict[score] = perf

        if not perf_comp_dict:
            return pd.DataFrame()

        perf_comp_res = []
        for score_name, perf in perf_comp_dict.items():
            perf = perf.copy()
            perf['score_name'] = score_name
            perf_comp_res.append(perf)

        perf_comp_res = pd.concat(perf_comp_res)

        drop_cols = ['AUC_Shift', 'KS_Shift']
        for col in drop_cols:
            if col in perf_comp_res.columns:
                perf_comp_res = perf_comp_res.drop(columns=[col])

        order_map = {val: i for i, val in enumerate(score_order) if val in perf_comp_dict}
        perf_comp_res['sort_key'] = perf_comp_res['score_name'].map(order_map)
        perf_comp_res_sorted = perf_comp_res.sort_values('sort_key').drop('sort_key', axis=1)

        return perf_comp_res_sorted

    def get_gains_summary(
        self,
        data: Optional[pd.DataFrame] = None,
        grp_name: Optional[str] = None,
        disp: bool = True,
        grp_disp_metric: List[str] = None,
        grp_nbins: int = 5,
        withSummary: bool = True,
        add_func: Optional[Callable] = None,
        sync_range: bool = True,
        spec_values=None,
        include_missing: Optional[bool] = None,
        fillna=None,
    ) -> pd.DataFrame:
        """
        Generate gains summary table for score analysis.

        This method creates gains tables via ``GainsTableCalculator``, showing the
        distribution of targets across score bins, with optional group stratification.
        Custom metrics can be injected through ``add_func``.

        Parameters
        ----------
        data : pandas.DataFrame, optional
            Data to use. If None, uses self.data.
        grp_name : str, optional
            Group name for stratified analysis.
        disp : bool, optional
            Whether to display results. Default is True.
        grp_disp_metric : list, optional
            Metrics to display for grouped analysis.
        grp_nbins : int, optional
            Number of bins for grouped analysis. Default is 5.
        withSummary : bool, optional
            Include summary row. Default is True.
        add_func : callable, optional
            Custom metric function merged into each gains table.
        sync_range : bool, optional
            Synchronize bin ranges. Default is True.
        spec_values : list, optional
            Special values to treat separately during binning.
        include_missing : bool, optional
            Whether to include missing values. Defaults to the instance setting.
        fillna : any, optional
            Missing-value fill for score binning. Defaults to the instance setting.

        Returns
        -------
        pandas.DataFrame
            Gains summary results with score names.
        """
        if grp_disp_metric is None:
            grp_disp_metric = ['N', 'PROP', 'AVG_BAD', 'LIFT']

        if fillna is None:
            fillna = self.fillna
        if spec_values is None:
            spec_values = self.spec_values
        if include_missing is None:
            include_missing = self.include_missing

        new_oot_df = self.data.copy() if data is None else data.copy()

        dep = self.dep
        base_score = self.base_score
        min_data_size = self.min_data_size
        nbins = self.nbins if grp_name is None else grp_nbins

        score_list = self._score_order()
        display_metric_list = self.gains_display_metric_list if add_func is None else None

        if self.data is None or len(self.data) == 0:
            empty_df = pd.DataFrame()
            empty_df.index = pd.MultiIndex.from_tuples([], names=['_bin_num', '_bin_range'])
            return empty_df

        gains_table_dict = {}
        for score in score_list:
            if score not in new_oot_df.columns:
                continue

            calculator = self._build_gains_calculator(
                data=new_oot_df,
                score=score,
                nbins=nbins,
                fillna=fillna,
                spec_values=spec_values,
                include_missing=include_missing,
                ascending=True,
            )

            if grp_name is None:
                gains_table_dict[score] = calculator.calculate(
                    withSummary=withSummary,
                    add_func=add_func,
                )
                if display_metric_list is not None:
                    gains_table_dict[score] = gains_table_dict[score][display_metric_list]
                gains_table_dict[score] = gains_table_dict[score].reset_index(drop=False)
            else:
                if new_oot_df.query(f"{score} > 0").shape[0] > 10:
                    oot_by_group = calculator.calculate(
                        grp_name=grp_name,
                        min_data_size=min_data_size,
                        sync_range=sync_range,
                        retSummary=False,
                        withSummary=withSummary,
                        add_func=add_func,
                    )
                    if display_metric_list is not None:
                        oot_by_group = oot_by_group[[*display_metric_list, grp_name]]
                    oot_by_group = oot_by_group.reset_index(drop=False)

                    grp_metric_dict = {}
                    for value in [x for x in grp_disp_metric if x not in ['PROP']]:
                        grp_metric_dict[value] = oot_by_group\
                            .reset_index(drop=False)\
                            .pivot_table(
                                index=['_bin_num', '_bin_range'],
                                columns=[grp_name],
                                values=value,
                                margins=True,
                                margins_name="Grand_Total",
                            )

                    if 'PROP' in grp_disp_metric:
                        tot_cnt = oot_by_group\
                            .reset_index(drop=False)\
                            .pivot_table(
                                index=['_bin_num', '_bin_range'],
                                columns=[grp_name],
                                values=['N'],
                                margins=True,
                                margins_name="Grand_Total",
                                aggfunc='sum',
                            )

                        if tot_cnt.shape[0] > 0:
                            grp_metric_dict['PROP'] = tot_cnt.iloc[0:tot_cnt.shape[0]:] / np.array(
                                tot_cnt.iloc[-1:].T['Grand_Total'].tolist()
                            )
                            grp_metric_dict['PROP'].columns = grp_metric_dict['PROP'].rename(
                                columns={'N': ''}
                            ).columns.map("".join)

                    fnl_grp_gains_res = []
                    for colname, gains in grp_metric_dict.items():
                        gains = gains.copy()
                        gains['variable'] = colname
                        if disp:
                            from IPython.display import display
                            display(gains)
                        fnl_grp_gains_res.append(gains)
                    fnl_grp_gains_res = pd.concat(fnl_grp_gains_res)

                    gains_table_dict[score] = oot_by_group

        fnl_gains_res = []
        for score_name, gains_res in gains_table_dict.items():
            gains_res = gains_res.copy()
            gains_res['score_name'] = score_name
            fnl_gains_res.append(gains_res)

        if not fnl_gains_res:
            return pd.DataFrame()

        return pd.concat(fnl_gains_res)

    def get_cross_risk_summary(
        self,
        cross_agg_dict: Optional[Dict] = None,
        nbins: int = 5,
        equal_freq: Optional[bool] = None,
        disp: bool = True,
        spec_values=None,
        binning_numeric=None,
    ) -> pd.DataFrame:
        """
        Generate cross-risk analysis summary between base and comparison scores.

        This method creates a comprehensive cross-tabulation showing risk metrics
        across different score ranges for both base and comparison scores.

        Parameters
        ----------
        cross_agg_dict : dict, optional
            Dictionary of column names and aggregation functions.
        nbins : int, optional
            Number of bins. Default is 5.
        equal_freq : bool, optional
            Use equal frequency binning. If None, uses self.equal_freq.
        disp : bool, optional
            Whether to display results. Default is True.
        spec_values : list, optional
            Special values to keep separate during binning.
        binning_numeric : bool or list/tuple of bool, optional
            Whether numeric score columns should be binned before cross-risk
            aggregation. If None, defaults to [True, True] for backward
            compatibility. Passing a bool applies the same setting to both
            base and comparison score.

        Returns
        -------
        pandas.DataFrame
            Cross-risk summary with base score range, comparison score range, and metrics.
        """
        data = self.data.copy()
        dep = self.dep
        min_bin_prop = self.min_bin_prop
        precision = self.precision
        tree_binning = self.tree_binning
        equal_freq = equal_freq if equal_freq is not None else self.equal_freq
        fillna = self.missing_rate_ref
        include_missing = self.include_missing

        base_score = self.base_score
        compare_scrlist = self.comp_scrlist

        if cross_agg_dict is None:
            cross_agg_dict = self.cross_agg_dict.copy()
        if spec_values is None:
            spec_values = self.spec_values
        if binning_numeric is None:
            binning_numeric = [True, True]
        elif isinstance(binning_numeric, (bool, np.bool_)):
            binning_numeric = [bool(binning_numeric), bool(binning_numeric)]
        elif isinstance(binning_numeric, (list, tuple)):
            if len(binning_numeric) != 2:
                raise ValueError(
                    "binning_numeric must be a bool or a two-element list/tuple."
                )
            if not all(isinstance(value, (bool, np.bool_)) for value in binning_numeric):
                raise TypeError("binning_numeric values must be booleans.")
            binning_numeric = [bool(binning_numeric[0]), bool(binning_numeric[1])]
        else:
            raise TypeError("binning_numeric must be None, a bool, or a two-element list/tuple.")

        def _ensure_three_level_columns(frame: pd.DataFrame) -> pd.DataFrame:
            frame = frame.copy()
            if isinstance(frame.columns, pd.MultiIndex):
                normalized_columns = []
                for col in frame.columns:
                    col_tuple = tuple(col)
                    if len(col_tuple) < 3:
                        col_tuple = col_tuple + ("",) * (3 - len(col_tuple))
                    elif len(col_tuple) > 3:
                        col_tuple = col_tuple[:2] + ("_".join(map(str, col_tuple[2:])),)
                    normalized_columns.append(col_tuple)
                frame.columns = pd.MultiIndex.from_tuples(normalized_columns)
            else:
                frame.columns = pd.MultiIndex.from_tuples(
                    [(col, "", "") for col in frame.columns]
                )
            return frame

        multi_scr_res = {}
        for compare_scr in compare_scrlist:
            logger.info(compare_scr)
            score_list = [base_score, compare_scr]
            score_query = " and ".join([s + " > 0" for s in score_list])

            prepared_data = data.query(score_query)

            if prepared_data.empty:
                return pd.DataFrame()  # 或跳过该组

            tot_cnt = prepared_data[score_list].shape[0]
            cross_agg_dict_copy = cross_agg_dict.copy()
            cross_agg_dict_copy.update({'flow_id': ['count', lambda x: x.count() / tot_cnt]})

            cross_res = {}
            for colname, aggfunc in cross_agg_dict_copy.items():
                if data.shape[0] > nbins:
                    cross_res[colname] = cross_risk(
                        data=prepared_data,
                        score_list=[base_score, compare_scr],
                        dep=dep,
                        nbins=nbins,
                        agg_col=colname,
                        precision=precision,
                        min_bin_prop=min_bin_prop,
                        include_missing=include_missing,
                        equal_freq=equal_freq,
                        binning_numeric=binning_numeric,
                        tree_binning=tree_binning,
                        agg_func=aggfunc,
                        fillna=fillna,
                        spec_values=spec_values
                    )

            fnl_res = []
            for colname, res in cross_res.items():
                res = _ensure_three_level_columns(res)
                res[('', 'eval_metric', '')] = colname
                res = res.rename(columns={"<lambda>": "", "count": "n"})
                if disp:
                    from IPython.display import display
                    display(res)
                fnl_res.append(res)

            fnl_res = pd.concat(fnl_res)

            fnl_res[('', 'score_name', '')] = compare_scr
            multi_scr_res[compare_scr] = fnl_res

        combined_cross_res = []
        for score_name, cross_data in multi_scr_res.items():
            cross_data = cross_data.copy()
            cross_data.columns = cross_data.columns.map(
                lambda x: f"{x[0]}_{x[1] if len(str(x[1])) >= 2 else ('0' + str(x[1]))}_{x[2]}".strip("_")
            )
            cross_data.index = cross_data.index.map(
                lambda x: f"{x[0] if len(str(x[0])) >= 2 else ('0' + str(x[0]))}_{x[1]}".strip("_")
            )
            cross_data = cross_data.reset_index(drop=False).melt(
                id_vars=['index', 'eval_metric', 'score_name'],
                var_name='comp_scr_range'
            ).rename(columns={"index": "base_scr_range"})
            combined_cross_res.append(cross_data)
        combined_cross_res = pd.concat(combined_cross_res)

        return combined_cross_res

    def multi_subset_wrapper(
        self,
        condition_dict: Optional[Dict[str, str]] = None,
        subset_var_name: str = 'eval_subset',
        func: Optional[Callable] = None,
        min_subset_size = 10,
        **kwargs
    ) -> pd.DataFrame:
        """
        Apply evaluation function across multiple data subsets.

        This method iterates over predefined subset conditions, filters the data
        accordingly, and applies the evaluation function to each subset.

        Parameters
        ----------
        condition_dict : dict, optional
            Dictionary mapping subset names to query conditions.
            If None, uses self.subset_condition_dict.
        subset_var_name : str, optional
            Column name for subset identifier in results. Default is 'eval_subset'.
        func : callable, optional
            Function to apply to each subset. If None, uses model_perf_compare.
        **kwargs
            Additional keyword arguments passed to the evaluation function.

        Returns
        -------
        pandas.DataFrame
            Combined results from all subsets.
        """
        if condition_dict is None:
            condition_dict = self.subset_condition_dict

        original_data = self.data.copy()
        if func is None:
            func = self.model_perf_compare

        fnl_subset_results = []

        if condition_dict:
            for group_value, query in condition_dict.items():
                logger.info(f"INFO:: Multi_Subset_Eval: Running Subset {group_value}")

                subset_data = self.data.query(query).copy() if query != "" else self.data.copy()
                subset_data_size = subset_data.shape[0]

                if subset_data_size > min_subset_size:
                    self.data = subset_data

                    subset_result = func(**kwargs)
                    if subset_result is not None and not subset_result.empty:
                        subset_result = subset_result.copy()
                        subset_result[subset_var_name] = group_value
                        fnl_subset_results.append(subset_result)
                    else:
#                         fnl_subset_results.append(pd.DataFrame({subset_var_name: [group_value]}))
                        self.data = original_data
                        continue

                    self.data = original_data


        if fnl_subset_results:
            return pd.concat(fnl_subset_results)

        return pd.DataFrame({subset_var_name: [group_value]})

    def multi_ylabel_wrapper(
        self,
        ylabels: Optional[List[str]] = None,
        ylabel_var_name: str = 'eval_ylabel',
        eval_func: Optional[Callable] = None,
        **kwargs
    ) -> pd.DataFrame:
        """
        Apply evaluation function across multiple target labels.

        This method iterates over different target/label columns, temporarily
        updating the dependent variable for each iteration.

        Parameters
        ----------
        ylabels : list, optional
            List of target column names. If None, uses self.eval_ylabels.
        ylabel_var_name : str, optional
            Column name for label identifier in results. Default is 'eval_ylabel'.
        eval_func : callable, optional
            Function to apply for each label. If None, uses model_perf_compare.
        **kwargs
            Additional keyword arguments passed to the evaluation function.

        Returns
        -------
        pandas.DataFrame
            Combined results from all labels.
        """
        original_dep = self.dep
        original_data = self.data.copy()
        if eval_func is None:
            eval_func = self.model_perf_compare

        fnl_ylabel_results = []

        if ylabels is None:
            ylabels = self.eval_ylabels

        for current_dep in ylabels:
            logger.info(f"INFO:: Multi_yLabel_Eval: Running Label {current_dep}")

            input_data = self.data.query(f"{current_dep} == {current_dep}").copy()
            subset_data_size = input_data.shape[0]

            if subset_data_size > 0:
                self.dep = current_dep
                self.data = input_data

                subset_result = eval_func(**kwargs)
                subset_result = subset_result.copy()
                subset_result[ylabel_var_name] = current_dep
                fnl_ylabel_results.append(subset_result)

                self.dep = original_dep
                self.data = original_data

        if fnl_ylabel_results:
            return pd.concat(fnl_ylabel_results)
        return pd.DataFrame()

    def multi_group_wrapper(
        self,
        group_name: Optional[str] = None,
        group_var_name: str = 'group_name',
        group_eval_func: Optional[Callable] = None,
        min_subset_size: int = 10,
        **kwargs
    ) -> pd.DataFrame:
        """
        Apply evaluation function across multiple groups.

        This method splits data by a grouping column and applies the evaluation
        function to each group separately.

        Parameters
        ----------
        group_name : str, optional
            Column name to group by. If None, processes all data together.
        group_var_name : str, optional
            Column name for group identifier in results. Default is 'group_name'.
        group_eval_func : callable, optional
            Function to apply to each group. If None, uses model_perf_compare.
        min_subset_size : int, optional
            Minimum data size required to process a group. Default is 10.
        **kwargs
            Additional keyword arguments passed to the evaluation function.

        Returns
        -------
        pandas.DataFrame
            Combined results from all groups.
        """
        original_data = self.data.copy()

        if group_name is None:
            return group_eval_func(**kwargs) if group_eval_func else pd.DataFrame()

        group_value_list = list(set(original_data[group_name].unique().tolist()))

        if group_eval_func is None:
            group_eval_func = self.model_perf_compare

        fnl_group_results = []
        for group_value in group_value_list:
            logger.info(f"INFO:: Multi_Group_Eval: Running Group {group_value}")

            input_data = original_data.query(f"{group_name} == '{group_value}'").copy()
            subset_data_size = input_data.shape[0]

            if subset_data_size > min_subset_size:
                self.data = input_data

                subset_result = group_eval_func(**kwargs)
                subset_result = subset_result.copy()
                subset_result[group_var_name] = group_value
                fnl_group_results.append(subset_result)

                self.data = original_data

        if fnl_group_results:
            return pd.concat(fnl_group_results)
        return pd.DataFrame()

    def multi_dim_eval(
        self,
        condition_dict: Optional[Dict[str, str]] = None,
        eval_ylabels: Optional[List[str]] = None,
        grp_namelist: Optional[List[str]] = None,
        eval_func: Optional[Callable] = None,
        subset_var_name: str = 'eval_subset',
        ylabel_var_name: str = 'eval_ylabel',
        var_name: str = 'group_name',
        value_name: str = 'group_value',
        **kwargs
    ) -> pd.DataFrame:
        """
        Perform multi-dimensional evaluation across subsets, labels, and groups.

        This method combines multi_subset_wrapper and multi_ylabel_wrapper to
        evaluate model performance across different dimensions.

        Parameters
        ----------
        condition_dict : dict, optional
            Subset conditions. If None, uses self.subset_condition_dict.
        eval_ylabels : list, optional
            Target labels. If None, uses self.eval_ylabels.
        grp_namelist : list, optional
            Group columns. If None, uses self.grp_namelist.
        eval_func : callable, optional
            Evaluation function. If None, uses model_perf_compare.
        subset_var_name : str, optional
            Column name for subset identifier.
        ylabel_var_name : str, optional
            Column name for label identifier.
        var_name : str, optional
            Column name for group name in results.
        value_name : str, optional
            Column name for group value in results.
        **kwargs
            Additional keyword arguments.

        Returns
        -------
        pandas.DataFrame
            Multi-dimensional evaluation results.
        """
        if condition_dict is None:
            condition_dict = self.subset_condition_dict

        if eval_ylabels is None:
            eval_ylabels = self.eval_ylabels

        if grp_namelist is None:
            grp_namelist = self.grp_namelist

        if eval_func is None:
            eval_func = self.model_perf_compare

        fnl_perf_res = []
        for group_name in grp_namelist:
            logger.info(f"INFO:: Multi_Dim_Eval: Running Group {group_name}")

            sig = inspect.signature(eval_func)
            parameters = sig.parameters

            if 'grp_name' in parameters:
                sub_perf_res = self.multi_subset_wrapper(
                    condition_dict=condition_dict,
                    subset_var_name=subset_var_name,
                    func=self.multi_ylabel_wrapper, 
                    ylabels=eval_ylabels, 
                    ylabel_var_name=ylabel_var_name,
                    eval_func=eval_func, 
                    grp_name=group_name,
                    **kwargs
                )
            else:
                sub_perf_res = self.multi_subset_wrapper(
                    condition_dict=condition_dict,
                    subset_var_name=subset_var_name,
                    func=self.multi_ylabel_wrapper, 
                    ylabels=eval_ylabels, 
                    ylabel_var_name=ylabel_var_name,
                    eval_func=eval_func, 
                    **kwargs
                )

            sub_perf_res = sub_perf_res.copy()
            sub_perf_res[var_name] = group_name
            fnl_perf_res.append(sub_perf_res)

        fnl_perf_res = pd.concat(fnl_perf_res)

        if 'grp_name' in parameters:
            if all(x in fnl_perf_res.columns for x in grp_namelist):
                fnl_perf_res[value_name] = fnl_perf_res[grp_namelist].bfill(axis=1).iloc[:, 0]
                fnl_perf_res = fnl_perf_res.drop(columns=grp_namelist)

        return fnl_perf_res

    def cross_perf_eval(
        self,
        bad_list: List[str],
        scr_list: List[str],
        data: Optional[pd.DataFrame] = None,
        eval_metric: List[str] = None,
        melt: bool = True
    ) -> pd.DataFrame:
        """
        Cross-evaluate model performance across multiple bad flags and scores.

        This method calculates performance metrics for all combinations of
        bad flags and scores, useful for comparing model behavior across
        different definitions of "bad" outcomes.

        Parameters
        ----------
        bad_list : list
            List of bad/negative outcome column names.
        scr_list : list
            List of score column names.
        data : pandas.DataFrame, optional
            Data to use. If None, uses self.data.
        eval_metric : list, optional
            Metrics to extract. If None, uses ['AUC', 'KS', 'N'].
        melt : bool, optional
            Whether to melt the pivot table. Default is True.

        Returns
        -------
        pandas.DataFrame
            Cross performance evaluation results.
        """
        if eval_metric is None:
            eval_metric = ['AUC', 'KS', 'N']

        if data is None:
            data = self.data.copy()

        score_query = " and ".join([s + " > 0" for s in scr_list])
        bad_query = " and ".join([s + " == " + s for s in bad_list])

        data = data.query(bad_query)
        data = data.query(score_query)

        if data.shape[0] > self.nbins:
            fnl_res = []
            for bad in bad_list:
                for scr in scr_list:
                    perf = get_perf_summary(
                        None,
                        None,
                        data,
                        tgt_name=bad,
                        scr_name=scr,
                        to_show=False,
                        display=False,
                        dist_bins=self.nbins,
                        pct_bins=self.nbins,
                        precision=self.precision,
                        min_bin_prop=self.min_bin_prop,
                        equal_freq=True
                    )
                    perf = perf.copy()
                    perf['eval_ylabel'] = bad
                    perf['score_name'] = scr
                    fnl_res.append(perf)

            fnl_res = pd.concat(fnl_res)

            summary = fnl_res.pivot_table(index="score_name", columns='eval_ylabel', values=eval_metric)
            summary.columns = summary.columns.map(lambda x: f"{x[0]}_{x[1]}")

            if melt:
                summary = summary.reset_index(drop=False).melt(id_vars=['score_name'])

            return summary

        return pd.DataFrame(columns=['eval_ylabel', 'score_name'])

    def run_variable_analysis_summary(
        self,
        varlist: List[str],
        data: Optional[pd.DataFrame] = None,
        dep: Optional[str] = None,
        nbins: Optional[int] = None,
        equal_freq: Optional[bool] = None,
        min_bin_prop: Optional[float] = None,
        precision: Optional[int] = None,
        chi2_method: Optional[bool] = None,
        chi2_p: Optional[float] = None,
        init_equi_bins: Optional[int] = None,
        tree_binning: Optional[bool] = None,
        include_missing: Optional[bool] = None,
        missing_rate_ref: Optional[Union[int, float]] = None,
        seed: Optional[int] = None,
        spec_values = []
    ) -> pd.DataFrame:
        """
        Run comprehensive variable analysis and generate summary report.

        This method performs binning analysis on specified variables,
        calculating metrics like IV (Information Value) and chi-square statistics.

        Parameters
        ----------
        varlist : list
            List of variable names to analyze.
        data : pandas.DataFrame, optional
            Data to use. If None, uses self.data.
        dep : str, optional
            Target variable name. If None, uses self.dep.
        nbins : int, optional
            Number of bins. If None, uses self.nbins.
        equal_freq : bool, optional
            Use equal frequency binning. If None, uses self.equal_freq.
        min_bin_prop : float, optional
            Minimum bin proportion. If None, uses self.min_bin_prop.
        precision : int, optional
            Decimal precision. If None, uses self.precision.
        chi2_method : bool, optional
            Use chi-square method. If None, uses self.chi2_method.
        chi2_p : float, optional
            Chi-square p-value. If None, uses self.chi2_p.
        init_equi_bins : int, optional
            Initial equal bins. If None, uses self.init_equi_bins.
        tree_binning : bool, optional
            Use tree binning. If None, uses self.tree_binning.
        include_missing : bool, optional
            Include missing values. If None, uses self.include_missing.
        missing_rate_ref : int or float, optional
            Missing reference. If None, uses self.missing_rate_ref.
        seed : int, optional
            Random seed. If None, uses self.seed.

        Returns
        -------
        pandas.DataFrame
            Variable analysis summary with IV and other metrics.
        """
        if data is None:
            data = self.data.copy()

        if dep is None:
            dep = self.dep

        nbins = nbins if nbins is not None else self.nbins
        equal_freq = equal_freq if equal_freq is not None else self.equal_freq
        min_bin_prop = min_bin_prop if min_bin_prop is not None else self.min_bin_prop
        precision = precision if precision is not None else self.precision
        chi2_method = chi2_method if chi2_method is not None else self.chi2_method
        chi2_p = chi2_p if chi2_p is not None else self.chi2_p
        init_equi_bins = init_equi_bins if init_equi_bins is not None else self.init_equi_bins
        tree_binning = tree_binning if tree_binning is not None else self.tree_binning
        include_missing = include_missing if include_missing is not None else self.include_missing
        missing_rate_ref = missing_rate_ref if missing_rate_ref is not None else self.missing_rate_ref
        seed = seed if seed is not None else self.seed

        from Modeling_Tool.Feature.Feature_Insights import VarExtractionInsights

        varInsights = VarExtractionInsights(
            data=data,
            dep=dep,
            plot_path=None,
            nbins=nbins,
            equal_freq=equal_freq,
            min_bin_prop=min_bin_prop,
            precision=precision,
            chi2_method=chi2_method,
            chi2_p=chi2_p,
            init_equi_bins=init_equi_bins,
            tree_binning=tree_binning,
            include_missing=include_missing,
            seed=seed,
            missing_rate_ref=missing_rate_ref,
            spec_values=spec_values
        )

        data = data.copy()
        for var in varlist:
            data[var] = data[var].fillna(missing_rate_ref)

        var_summary = varInsights.get_var_analysis_report(data=data, varlist=varlist, dep=dep, iv_cut=0)

        return var_summary


    def pipe(self, data=None):
            """
            返回一个 EvaluationPipeline 对象,用于链式分组和子集评估。

            Parameters
            ----------
            data : pd.DataFrame, optional
                指定数据,默认使用 self.data

            Returns
            -------
            EvaluationPipeline
            """
            return EvaluationPipeline(self, data)

get_base_score

get_base_score(scorename: str = '_base_model_score_', disp: bool = False)

Calculate base model scores using the provided model.

This method uses the model's predict_proba method to generate scores for the input data and optionally displays correlation analysis.

参数:

名称 类型 描述 默认
scorename str

Name for the score column. Default is 'base_model_score'.

'_base_model_score_'
disp bool

Whether to display intermediate results. Default is False.

False

返回:

类型 描述
Model_Evaluation_Tool

Returns self for method chaining.

引发:

类型 描述
AttributeError

If model is None or lacks required attributes.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def get_base_score(self, scorename: str = '_base_model_score_', disp: bool = False):
    """
    Calculate base model scores using the provided model.

    This method uses the model's predict_proba method to generate scores
    for the input data and optionally displays correlation analysis.

    Parameters
    ----------
    scorename : str, optional
        Name for the score column. Default is '_base_model_score_'.
    disp : bool, optional
        Whether to display intermediate results. Default is False.

    Returns
    -------
    Model_Evaluation_Tool
        Returns self for method chaining.

    Raises
    ------
    AttributeError
        If model is None or lacks required attributes.
    """
    if self.model is None:
        raise AttributeError("Model must be provided to calculate base score.")

    data = self.data.copy()
    model = self.model

    inputs = model.feature_names_in_.tolist()
    data[scorename] = model.predict_proba(data[inputs])[:, 1]
    self.data = data
    self.base_score = scorename

    scrlist = [scorename] + self.comp_scrlist
    if disp:
        from IPython.display import display
        from Modeling_Tool.Feature.Distribution_Tool import proc_means_by_grp
        display(proc_means_by_grp(data, scrlist))

    tmp_data = data[data[scrlist] > 0]

    if disp:
        from IPython.display import display
        display(tmp_data[scrlist].corr())

    return self

get_score_correlation

get_score_correlation(score_list: Optional[List[str]] = None, method: str = 'pearson') -> DataFrame

Calculate correlation matrix between score columns.

参数:

名称 类型 描述 默认
score_list list

List of score column names. If None, uses base_score and comp_scrlist.

None
method str

Correlation method ('pearson', 'spearman', 'kendall'). Default is 'pearson'.

'pearson'

返回:

类型 描述
DataFrame

Correlation summary in long format with columns: 'base', 'compare', 'corr'.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def get_score_correlation(
    self,
    score_list: Optional[List[str]] = None,
    method: str = 'pearson'
) -> pd.DataFrame:
    """
    Calculate correlation matrix between score columns.

    Parameters
    ----------
    score_list : list, optional
        List of score column names. If None, uses base_score and comp_scrlist.
    method : str, optional
        Correlation method ('pearson', 'spearman', 'kendall'). Default is 'pearson'.

    Returns
    -------
    pandas.DataFrame
        Correlation summary in long format with columns: 'base', 'compare', 'corr'.
    """
    if score_list is None:
        score_list = [self.base_score] + self.comp_scrlist

    if self.base_score is None:
        raise ValueError("Base score must be set before calculating correlation.")

    data = self.data.copy()

    score_query = " and ".join([s + " > 0" for s in score_list])
    data = data.query(score_query)

    corr_summary = data[score_list]\
        .corr(method=method)\
        .reset_index(drop=False)\
        .melt(id_vars=['index'], var_name='compare', value_name='corr')\
        .rename(columns={"index": "base"})

    return corr_summary

model_perf_compare

model_perf_compare(data: Optional[DataFrame] = None, grp_name: Optional[str] = None, dist_bins: int = 100, pct_bins: int = 10, min_data_size: int = 50, sync_data_size: bool = True, min_bin_prop: Optional[float] = None, include_missing: Optional[bool] = None, equal_freq: Optional[bool] = None) -> DataFrame

Compare model performance across different scores.

This method calculates performance metrics (AUC, KS, etc.) for the base score and all comparison scores via PerformanceEvaluator, optionally filtering by group.

参数:

名称 类型 描述 默认
data DataFrame

Data to use. If None, uses self.data.

None
grp_name str

Group name column for stratification.

None
dist_bins int

Number of distribution bins. Default is 100.

100
pct_bins int

Number of percentile bins. Default is 10.

10
min_data_size int

Minimum data size per bin. Default is 50.

50
sync_data_size bool

Whether to filter out zero/negative scores. Default is True.

True
min_bin_prop float

Minimum bin proportion. Defaults to the instance setting.

None
include_missing bool

Whether to include missing values in performance binning. Defaults to False for performance comparison.

None
equal_freq bool

Use equal-frequency binning. Defaults to the instance setting.

None

返回:

类型 描述
DataFrame

Performance comparison results sorted by score order.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def model_perf_compare(
    self,
    data: Optional[pd.DataFrame] = None,
    grp_name: Optional[str] = None,
    dist_bins: int = 100,
    pct_bins: int = 10,
    min_data_size: int = 50,
    sync_data_size: bool = True,
    min_bin_prop: Optional[float] = None,
    include_missing: Optional[bool] = None,
    equal_freq: Optional[bool] = None,
) -> pd.DataFrame:
    """
    Compare model performance across different scores.

    This method calculates performance metrics (AUC, KS, etc.) for the base score
    and all comparison scores via ``PerformanceEvaluator``, optionally filtering
    by group.

    Parameters
    ----------
    data : pandas.DataFrame, optional
        Data to use. If None, uses self.data.
    grp_name : str, optional
        Group name column for stratification.
    dist_bins : int, optional
        Number of distribution bins. Default is 100.
    pct_bins : int, optional
        Number of percentile bins. Default is 10.
    min_data_size : int, optional
        Minimum data size per bin. Default is 50.
    sync_data_size : bool, optional
        Whether to filter out zero/negative scores. Default is True.
    min_bin_prop : float, optional
        Minimum bin proportion. Defaults to the instance setting.
    include_missing : bool, optional
        Whether to include missing values in performance binning.
        Defaults to ``False`` for performance comparison.
    equal_freq : bool, optional
        Use equal-frequency binning. Defaults to the instance setting.

    Returns
    -------
    pandas.DataFrame
        Performance comparison results sorted by score order.
    """
    data = self.data.copy() if data is None else data.copy()
    score_list = self.comp_scrlist
    dep = self.dep
    base_score = self.base_score
    sample_name = 'oot'

    if base_score is None:
        return pd.DataFrame()

    valid_mask = (
        data[dep].notna()
        & data[base_score].notna()
        & np.isfinite(data[base_score])
    )
    clean_data = data.loc[valid_mask]
    if clean_data.empty:
        return pd.DataFrame()

    shared_data = clean_data.copy()
    if sync_data_size and score_list:
        shared_data = self._filter_positive_scores(shared_data, score_list)

    perf_comp_dict = {}
    score_order = self._score_order()

    for score in score_order:
        if score not in clean_data.columns:
            continue
        score_data = shared_data if sync_data_size and score in (score_list or []) else clean_data
        score_data = score_data.loc[score_data[score] > 0] if self.positive_score_only else score_data
        if score_data.empty:
            continue

        evaluator = self._build_performance_evaluator(
            score=score,
            dist_bins=dist_bins,
            pct_bins=pct_bins,
            min_bin_prop=min_bin_prop,
            include_missing=include_missing,
            equal_freq=equal_freq,
        )
        perf = evaluator.add_dataset(sample_name, score_data).evaluate(
            oot_grp_name=grp_name,
            min_data_size=min_data_size,
            to_show=False,
            display=False,
        )
        if isinstance(perf, pd.DataFrame) and not perf.empty and 'index' in perf.columns:
            perf = perf.query(f"index == '{sample_name}'")
        perf_comp_dict[score] = perf

    if not perf_comp_dict:
        return pd.DataFrame()

    perf_comp_res = []
    for score_name, perf in perf_comp_dict.items():
        perf = perf.copy()
        perf['score_name'] = score_name
        perf_comp_res.append(perf)

    perf_comp_res = pd.concat(perf_comp_res)

    drop_cols = ['AUC_Shift', 'KS_Shift']
    for col in drop_cols:
        if col in perf_comp_res.columns:
            perf_comp_res = perf_comp_res.drop(columns=[col])

    order_map = {val: i for i, val in enumerate(score_order) if val in perf_comp_dict}
    perf_comp_res['sort_key'] = perf_comp_res['score_name'].map(order_map)
    perf_comp_res_sorted = perf_comp_res.sort_values('sort_key').drop('sort_key', axis=1)

    return perf_comp_res_sorted

get_gains_summary

get_gains_summary(data: Optional[DataFrame] = None, grp_name: Optional[str] = None, disp: bool = True, grp_disp_metric: List[str] = None, grp_nbins: int = 5, withSummary: bool = True, add_func: Optional[Callable] = None, sync_range: bool = True, spec_values=None, include_missing: Optional[bool] = None, fillna=None) -> DataFrame

Generate gains summary table for score analysis.

This method creates gains tables via GainsTableCalculator, showing the distribution of targets across score bins, with optional group stratification. Custom metrics can be injected through add_func.

参数:

名称 类型 描述 默认
data DataFrame

Data to use. If None, uses self.data.

None
grp_name str

Group name for stratified analysis.

None
disp bool

Whether to display results. Default is True.

True
grp_disp_metric list

Metrics to display for grouped analysis.

None
grp_nbins int

Number of bins for grouped analysis. Default is 5.

5
withSummary bool

Include summary row. Default is True.

True
add_func callable

Custom metric function merged into each gains table.

None
sync_range bool

Synchronize bin ranges. Default is True.

True
spec_values list

Special values to treat separately during binning.

None
include_missing bool

Whether to include missing values. Defaults to the instance setting.

None
fillna any

Missing-value fill for score binning. Defaults to the instance setting.

None

返回:

类型 描述
DataFrame

Gains summary results with score names.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def get_gains_summary(
    self,
    data: Optional[pd.DataFrame] = None,
    grp_name: Optional[str] = None,
    disp: bool = True,
    grp_disp_metric: List[str] = None,
    grp_nbins: int = 5,
    withSummary: bool = True,
    add_func: Optional[Callable] = None,
    sync_range: bool = True,
    spec_values=None,
    include_missing: Optional[bool] = None,
    fillna=None,
) -> pd.DataFrame:
    """
    Generate gains summary table for score analysis.

    This method creates gains tables via ``GainsTableCalculator``, showing the
    distribution of targets across score bins, with optional group stratification.
    Custom metrics can be injected through ``add_func``.

    Parameters
    ----------
    data : pandas.DataFrame, optional
        Data to use. If None, uses self.data.
    grp_name : str, optional
        Group name for stratified analysis.
    disp : bool, optional
        Whether to display results. Default is True.
    grp_disp_metric : list, optional
        Metrics to display for grouped analysis.
    grp_nbins : int, optional
        Number of bins for grouped analysis. Default is 5.
    withSummary : bool, optional
        Include summary row. Default is True.
    add_func : callable, optional
        Custom metric function merged into each gains table.
    sync_range : bool, optional
        Synchronize bin ranges. Default is True.
    spec_values : list, optional
        Special values to treat separately during binning.
    include_missing : bool, optional
        Whether to include missing values. Defaults to the instance setting.
    fillna : any, optional
        Missing-value fill for score binning. Defaults to the instance setting.

    Returns
    -------
    pandas.DataFrame
        Gains summary results with score names.
    """
    if grp_disp_metric is None:
        grp_disp_metric = ['N', 'PROP', 'AVG_BAD', 'LIFT']

    if fillna is None:
        fillna = self.fillna
    if spec_values is None:
        spec_values = self.spec_values
    if include_missing is None:
        include_missing = self.include_missing

    new_oot_df = self.data.copy() if data is None else data.copy()

    dep = self.dep
    base_score = self.base_score
    min_data_size = self.min_data_size
    nbins = self.nbins if grp_name is None else grp_nbins

    score_list = self._score_order()
    display_metric_list = self.gains_display_metric_list if add_func is None else None

    if self.data is None or len(self.data) == 0:
        empty_df = pd.DataFrame()
        empty_df.index = pd.MultiIndex.from_tuples([], names=['_bin_num', '_bin_range'])
        return empty_df

    gains_table_dict = {}
    for score in score_list:
        if score not in new_oot_df.columns:
            continue

        calculator = self._build_gains_calculator(
            data=new_oot_df,
            score=score,
            nbins=nbins,
            fillna=fillna,
            spec_values=spec_values,
            include_missing=include_missing,
            ascending=True,
        )

        if grp_name is None:
            gains_table_dict[score] = calculator.calculate(
                withSummary=withSummary,
                add_func=add_func,
            )
            if display_metric_list is not None:
                gains_table_dict[score] = gains_table_dict[score][display_metric_list]
            gains_table_dict[score] = gains_table_dict[score].reset_index(drop=False)
        else:
            if new_oot_df.query(f"{score} > 0").shape[0] > 10:
                oot_by_group = calculator.calculate(
                    grp_name=grp_name,
                    min_data_size=min_data_size,
                    sync_range=sync_range,
                    retSummary=False,
                    withSummary=withSummary,
                    add_func=add_func,
                )
                if display_metric_list is not None:
                    oot_by_group = oot_by_group[[*display_metric_list, grp_name]]
                oot_by_group = oot_by_group.reset_index(drop=False)

                grp_metric_dict = {}
                for value in [x for x in grp_disp_metric if x not in ['PROP']]:
                    grp_metric_dict[value] = oot_by_group\
                        .reset_index(drop=False)\
                        .pivot_table(
                            index=['_bin_num', '_bin_range'],
                            columns=[grp_name],
                            values=value,
                            margins=True,
                            margins_name="Grand_Total",
                        )

                if 'PROP' in grp_disp_metric:
                    tot_cnt = oot_by_group\
                        .reset_index(drop=False)\
                        .pivot_table(
                            index=['_bin_num', '_bin_range'],
                            columns=[grp_name],
                            values=['N'],
                            margins=True,
                            margins_name="Grand_Total",
                            aggfunc='sum',
                        )

                    if tot_cnt.shape[0] > 0:
                        grp_metric_dict['PROP'] = tot_cnt.iloc[0:tot_cnt.shape[0]:] / np.array(
                            tot_cnt.iloc[-1:].T['Grand_Total'].tolist()
                        )
                        grp_metric_dict['PROP'].columns = grp_metric_dict['PROP'].rename(
                            columns={'N': ''}
                        ).columns.map("".join)

                fnl_grp_gains_res = []
                for colname, gains in grp_metric_dict.items():
                    gains = gains.copy()
                    gains['variable'] = colname
                    if disp:
                        from IPython.display import display
                        display(gains)
                    fnl_grp_gains_res.append(gains)
                fnl_grp_gains_res = pd.concat(fnl_grp_gains_res)

                gains_table_dict[score] = oot_by_group

    fnl_gains_res = []
    for score_name, gains_res in gains_table_dict.items():
        gains_res = gains_res.copy()
        gains_res['score_name'] = score_name
        fnl_gains_res.append(gains_res)

    if not fnl_gains_res:
        return pd.DataFrame()

    return pd.concat(fnl_gains_res)

get_cross_risk_summary

get_cross_risk_summary(cross_agg_dict: Optional[Dict] = None, nbins: int = 5, equal_freq: Optional[bool] = None, disp: bool = True, spec_values=None, binning_numeric=None) -> DataFrame

Generate cross-risk analysis summary between base and comparison scores.

This method creates a comprehensive cross-tabulation showing risk metrics across different score ranges for both base and comparison scores.

参数:

名称 类型 描述 默认
cross_agg_dict dict

Dictionary of column names and aggregation functions.

None
nbins int

Number of bins. Default is 5.

5
equal_freq bool

Use equal frequency binning. If None, uses self.equal_freq.

None
disp bool

Whether to display results. Default is True.

True
spec_values list

Special values to keep separate during binning.

None
binning_numeric bool or list/tuple of bool

Whether numeric score columns should be binned before cross-risk aggregation. If None, defaults to [True, True] for backward compatibility. Passing a bool applies the same setting to both base and comparison score.

None

返回:

类型 描述
DataFrame

Cross-risk summary with base score range, comparison score range, and metrics.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def get_cross_risk_summary(
    self,
    cross_agg_dict: Optional[Dict] = None,
    nbins: int = 5,
    equal_freq: Optional[bool] = None,
    disp: bool = True,
    spec_values=None,
    binning_numeric=None,
) -> pd.DataFrame:
    """
    Generate cross-risk analysis summary between base and comparison scores.

    This method creates a comprehensive cross-tabulation showing risk metrics
    across different score ranges for both base and comparison scores.

    Parameters
    ----------
    cross_agg_dict : dict, optional
        Dictionary of column names and aggregation functions.
    nbins : int, optional
        Number of bins. Default is 5.
    equal_freq : bool, optional
        Use equal frequency binning. If None, uses self.equal_freq.
    disp : bool, optional
        Whether to display results. Default is True.
    spec_values : list, optional
        Special values to keep separate during binning.
    binning_numeric : bool or list/tuple of bool, optional
        Whether numeric score columns should be binned before cross-risk
        aggregation. If None, defaults to [True, True] for backward
        compatibility. Passing a bool applies the same setting to both
        base and comparison score.

    Returns
    -------
    pandas.DataFrame
        Cross-risk summary with base score range, comparison score range, and metrics.
    """
    data = self.data.copy()
    dep = self.dep
    min_bin_prop = self.min_bin_prop
    precision = self.precision
    tree_binning = self.tree_binning
    equal_freq = equal_freq if equal_freq is not None else self.equal_freq
    fillna = self.missing_rate_ref
    include_missing = self.include_missing

    base_score = self.base_score
    compare_scrlist = self.comp_scrlist

    if cross_agg_dict is None:
        cross_agg_dict = self.cross_agg_dict.copy()
    if spec_values is None:
        spec_values = self.spec_values
    if binning_numeric is None:
        binning_numeric = [True, True]
    elif isinstance(binning_numeric, (bool, np.bool_)):
        binning_numeric = [bool(binning_numeric), bool(binning_numeric)]
    elif isinstance(binning_numeric, (list, tuple)):
        if len(binning_numeric) != 2:
            raise ValueError(
                "binning_numeric must be a bool or a two-element list/tuple."
            )
        if not all(isinstance(value, (bool, np.bool_)) for value in binning_numeric):
            raise TypeError("binning_numeric values must be booleans.")
        binning_numeric = [bool(binning_numeric[0]), bool(binning_numeric[1])]
    else:
        raise TypeError("binning_numeric must be None, a bool, or a two-element list/tuple.")

    def _ensure_three_level_columns(frame: pd.DataFrame) -> pd.DataFrame:
        frame = frame.copy()
        if isinstance(frame.columns, pd.MultiIndex):
            normalized_columns = []
            for col in frame.columns:
                col_tuple = tuple(col)
                if len(col_tuple) < 3:
                    col_tuple = col_tuple + ("",) * (3 - len(col_tuple))
                elif len(col_tuple) > 3:
                    col_tuple = col_tuple[:2] + ("_".join(map(str, col_tuple[2:])),)
                normalized_columns.append(col_tuple)
            frame.columns = pd.MultiIndex.from_tuples(normalized_columns)
        else:
            frame.columns = pd.MultiIndex.from_tuples(
                [(col, "", "") for col in frame.columns]
            )
        return frame

    multi_scr_res = {}
    for compare_scr in compare_scrlist:
        logger.info(compare_scr)
        score_list = [base_score, compare_scr]
        score_query = " and ".join([s + " > 0" for s in score_list])

        prepared_data = data.query(score_query)

        if prepared_data.empty:
            return pd.DataFrame()  # 或跳过该组

        tot_cnt = prepared_data[score_list].shape[0]
        cross_agg_dict_copy = cross_agg_dict.copy()
        cross_agg_dict_copy.update({'flow_id': ['count', lambda x: x.count() / tot_cnt]})

        cross_res = {}
        for colname, aggfunc in cross_agg_dict_copy.items():
            if data.shape[0] > nbins:
                cross_res[colname] = cross_risk(
                    data=prepared_data,
                    score_list=[base_score, compare_scr],
                    dep=dep,
                    nbins=nbins,
                    agg_col=colname,
                    precision=precision,
                    min_bin_prop=min_bin_prop,
                    include_missing=include_missing,
                    equal_freq=equal_freq,
                    binning_numeric=binning_numeric,
                    tree_binning=tree_binning,
                    agg_func=aggfunc,
                    fillna=fillna,
                    spec_values=spec_values
                )

        fnl_res = []
        for colname, res in cross_res.items():
            res = _ensure_three_level_columns(res)
            res[('', 'eval_metric', '')] = colname
            res = res.rename(columns={"<lambda>": "", "count": "n"})
            if disp:
                from IPython.display import display
                display(res)
            fnl_res.append(res)

        fnl_res = pd.concat(fnl_res)

        fnl_res[('', 'score_name', '')] = compare_scr
        multi_scr_res[compare_scr] = fnl_res

    combined_cross_res = []
    for score_name, cross_data in multi_scr_res.items():
        cross_data = cross_data.copy()
        cross_data.columns = cross_data.columns.map(
            lambda x: f"{x[0]}_{x[1] if len(str(x[1])) >= 2 else ('0' + str(x[1]))}_{x[2]}".strip("_")
        )
        cross_data.index = cross_data.index.map(
            lambda x: f"{x[0] if len(str(x[0])) >= 2 else ('0' + str(x[0]))}_{x[1]}".strip("_")
        )
        cross_data = cross_data.reset_index(drop=False).melt(
            id_vars=['index', 'eval_metric', 'score_name'],
            var_name='comp_scr_range'
        ).rename(columns={"index": "base_scr_range"})
        combined_cross_res.append(cross_data)
    combined_cross_res = pd.concat(combined_cross_res)

    return combined_cross_res

multi_subset_wrapper

multi_subset_wrapper(condition_dict: Optional[Dict[str, str]] = None, subset_var_name: str = 'eval_subset', func: Optional[Callable] = None, min_subset_size=10, **kwargs) -> DataFrame

Apply evaluation function across multiple data subsets.

This method iterates over predefined subset conditions, filters the data accordingly, and applies the evaluation function to each subset.

参数:

名称 类型 描述 默认
condition_dict dict

Dictionary mapping subset names to query conditions. If None, uses self.subset_condition_dict.

None
subset_var_name str

Column name for subset identifier in results. Default is 'eval_subset'.

'eval_subset'
func callable

Function to apply to each subset. If None, uses model_perf_compare.

None
**kwargs

Additional keyword arguments passed to the evaluation function.

{}

返回:

类型 描述
DataFrame

Combined results from all subsets.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
    def multi_subset_wrapper(
        self,
        condition_dict: Optional[Dict[str, str]] = None,
        subset_var_name: str = 'eval_subset',
        func: Optional[Callable] = None,
        min_subset_size = 10,
        **kwargs
    ) -> pd.DataFrame:
        """
        Apply evaluation function across multiple data subsets.

        This method iterates over predefined subset conditions, filters the data
        accordingly, and applies the evaluation function to each subset.

        Parameters
        ----------
        condition_dict : dict, optional
            Dictionary mapping subset names to query conditions.
            If None, uses self.subset_condition_dict.
        subset_var_name : str, optional
            Column name for subset identifier in results. Default is 'eval_subset'.
        func : callable, optional
            Function to apply to each subset. If None, uses model_perf_compare.
        **kwargs
            Additional keyword arguments passed to the evaluation function.

        Returns
        -------
        pandas.DataFrame
            Combined results from all subsets.
        """
        if condition_dict is None:
            condition_dict = self.subset_condition_dict

        original_data = self.data.copy()
        if func is None:
            func = self.model_perf_compare

        fnl_subset_results = []

        if condition_dict:
            for group_value, query in condition_dict.items():
                logger.info(f"INFO:: Multi_Subset_Eval: Running Subset {group_value}")

                subset_data = self.data.query(query).copy() if query != "" else self.data.copy()
                subset_data_size = subset_data.shape[0]

                if subset_data_size > min_subset_size:
                    self.data = subset_data

                    subset_result = func(**kwargs)
                    if subset_result is not None and not subset_result.empty:
                        subset_result = subset_result.copy()
                        subset_result[subset_var_name] = group_value
                        fnl_subset_results.append(subset_result)
                    else:
#                         fnl_subset_results.append(pd.DataFrame({subset_var_name: [group_value]}))
                        self.data = original_data
                        continue

                    self.data = original_data


        if fnl_subset_results:
            return pd.concat(fnl_subset_results)

        return pd.DataFrame({subset_var_name: [group_value]})

multi_ylabel_wrapper

multi_ylabel_wrapper(ylabels: Optional[List[str]] = None, ylabel_var_name: str = 'eval_ylabel', eval_func: Optional[Callable] = None, **kwargs) -> DataFrame

Apply evaluation function across multiple target labels.

This method iterates over different target/label columns, temporarily updating the dependent variable for each iteration.

参数:

名称 类型 描述 默认
ylabels list

List of target column names. If None, uses self.eval_ylabels.

None
ylabel_var_name str

Column name for label identifier in results. Default is 'eval_ylabel'.

'eval_ylabel'
eval_func callable

Function to apply for each label. If None, uses model_perf_compare.

None
**kwargs

Additional keyword arguments passed to the evaluation function.

{}

返回:

类型 描述
DataFrame

Combined results from all labels.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def multi_ylabel_wrapper(
    self,
    ylabels: Optional[List[str]] = None,
    ylabel_var_name: str = 'eval_ylabel',
    eval_func: Optional[Callable] = None,
    **kwargs
) -> pd.DataFrame:
    """
    Apply evaluation function across multiple target labels.

    This method iterates over different target/label columns, temporarily
    updating the dependent variable for each iteration.

    Parameters
    ----------
    ylabels : list, optional
        List of target column names. If None, uses self.eval_ylabels.
    ylabel_var_name : str, optional
        Column name for label identifier in results. Default is 'eval_ylabel'.
    eval_func : callable, optional
        Function to apply for each label. If None, uses model_perf_compare.
    **kwargs
        Additional keyword arguments passed to the evaluation function.

    Returns
    -------
    pandas.DataFrame
        Combined results from all labels.
    """
    original_dep = self.dep
    original_data = self.data.copy()
    if eval_func is None:
        eval_func = self.model_perf_compare

    fnl_ylabel_results = []

    if ylabels is None:
        ylabels = self.eval_ylabels

    for current_dep in ylabels:
        logger.info(f"INFO:: Multi_yLabel_Eval: Running Label {current_dep}")

        input_data = self.data.query(f"{current_dep} == {current_dep}").copy()
        subset_data_size = input_data.shape[0]

        if subset_data_size > 0:
            self.dep = current_dep
            self.data = input_data

            subset_result = eval_func(**kwargs)
            subset_result = subset_result.copy()
            subset_result[ylabel_var_name] = current_dep
            fnl_ylabel_results.append(subset_result)

            self.dep = original_dep
            self.data = original_data

    if fnl_ylabel_results:
        return pd.concat(fnl_ylabel_results)
    return pd.DataFrame()

multi_group_wrapper

multi_group_wrapper(group_name: Optional[str] = None, group_var_name: str = 'group_name', group_eval_func: Optional[Callable] = None, min_subset_size: int = 10, **kwargs) -> DataFrame

Apply evaluation function across multiple groups.

This method splits data by a grouping column and applies the evaluation function to each group separately.

参数:

名称 类型 描述 默认
group_name str

Column name to group by. If None, processes all data together.

None
group_var_name str

Column name for group identifier in results. Default is 'group_name'.

'group_name'
group_eval_func callable

Function to apply to each group. If None, uses model_perf_compare.

None
min_subset_size int

Minimum data size required to process a group. Default is 10.

10
**kwargs

Additional keyword arguments passed to the evaluation function.

{}

返回:

类型 描述
DataFrame

Combined results from all groups.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def multi_group_wrapper(
    self,
    group_name: Optional[str] = None,
    group_var_name: str = 'group_name',
    group_eval_func: Optional[Callable] = None,
    min_subset_size: int = 10,
    **kwargs
) -> pd.DataFrame:
    """
    Apply evaluation function across multiple groups.

    This method splits data by a grouping column and applies the evaluation
    function to each group separately.

    Parameters
    ----------
    group_name : str, optional
        Column name to group by. If None, processes all data together.
    group_var_name : str, optional
        Column name for group identifier in results. Default is 'group_name'.
    group_eval_func : callable, optional
        Function to apply to each group. If None, uses model_perf_compare.
    min_subset_size : int, optional
        Minimum data size required to process a group. Default is 10.
    **kwargs
        Additional keyword arguments passed to the evaluation function.

    Returns
    -------
    pandas.DataFrame
        Combined results from all groups.
    """
    original_data = self.data.copy()

    if group_name is None:
        return group_eval_func(**kwargs) if group_eval_func else pd.DataFrame()

    group_value_list = list(set(original_data[group_name].unique().tolist()))

    if group_eval_func is None:
        group_eval_func = self.model_perf_compare

    fnl_group_results = []
    for group_value in group_value_list:
        logger.info(f"INFO:: Multi_Group_Eval: Running Group {group_value}")

        input_data = original_data.query(f"{group_name} == '{group_value}'").copy()
        subset_data_size = input_data.shape[0]

        if subset_data_size > min_subset_size:
            self.data = input_data

            subset_result = group_eval_func(**kwargs)
            subset_result = subset_result.copy()
            subset_result[group_var_name] = group_value
            fnl_group_results.append(subset_result)

            self.data = original_data

    if fnl_group_results:
        return pd.concat(fnl_group_results)
    return pd.DataFrame()

multi_dim_eval

multi_dim_eval(condition_dict: Optional[Dict[str, str]] = None, eval_ylabels: Optional[List[str]] = None, grp_namelist: Optional[List[str]] = None, eval_func: Optional[Callable] = None, subset_var_name: str = 'eval_subset', ylabel_var_name: str = 'eval_ylabel', var_name: str = 'group_name', value_name: str = 'group_value', **kwargs) -> DataFrame

Perform multi-dimensional evaluation across subsets, labels, and groups.

This method combines multi_subset_wrapper and multi_ylabel_wrapper to evaluate model performance across different dimensions.

参数:

名称 类型 描述 默认
condition_dict dict

Subset conditions. If None, uses self.subset_condition_dict.

None
eval_ylabels list

Target labels. If None, uses self.eval_ylabels.

None
grp_namelist list

Group columns. If None, uses self.grp_namelist.

None
eval_func callable

Evaluation function. If None, uses model_perf_compare.

None
subset_var_name str

Column name for subset identifier.

'eval_subset'
ylabel_var_name str

Column name for label identifier.

'eval_ylabel'
var_name str

Column name for group name in results.

'group_name'
value_name str

Column name for group value in results.

'group_value'
**kwargs

Additional keyword arguments.

{}

返回:

类型 描述
DataFrame

Multi-dimensional evaluation results.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def multi_dim_eval(
    self,
    condition_dict: Optional[Dict[str, str]] = None,
    eval_ylabels: Optional[List[str]] = None,
    grp_namelist: Optional[List[str]] = None,
    eval_func: Optional[Callable] = None,
    subset_var_name: str = 'eval_subset',
    ylabel_var_name: str = 'eval_ylabel',
    var_name: str = 'group_name',
    value_name: str = 'group_value',
    **kwargs
) -> pd.DataFrame:
    """
    Perform multi-dimensional evaluation across subsets, labels, and groups.

    This method combines multi_subset_wrapper and multi_ylabel_wrapper to
    evaluate model performance across different dimensions.

    Parameters
    ----------
    condition_dict : dict, optional
        Subset conditions. If None, uses self.subset_condition_dict.
    eval_ylabels : list, optional
        Target labels. If None, uses self.eval_ylabels.
    grp_namelist : list, optional
        Group columns. If None, uses self.grp_namelist.
    eval_func : callable, optional
        Evaluation function. If None, uses model_perf_compare.
    subset_var_name : str, optional
        Column name for subset identifier.
    ylabel_var_name : str, optional
        Column name for label identifier.
    var_name : str, optional
        Column name for group name in results.
    value_name : str, optional
        Column name for group value in results.
    **kwargs
        Additional keyword arguments.

    Returns
    -------
    pandas.DataFrame
        Multi-dimensional evaluation results.
    """
    if condition_dict is None:
        condition_dict = self.subset_condition_dict

    if eval_ylabels is None:
        eval_ylabels = self.eval_ylabels

    if grp_namelist is None:
        grp_namelist = self.grp_namelist

    if eval_func is None:
        eval_func = self.model_perf_compare

    fnl_perf_res = []
    for group_name in grp_namelist:
        logger.info(f"INFO:: Multi_Dim_Eval: Running Group {group_name}")

        sig = inspect.signature(eval_func)
        parameters = sig.parameters

        if 'grp_name' in parameters:
            sub_perf_res = self.multi_subset_wrapper(
                condition_dict=condition_dict,
                subset_var_name=subset_var_name,
                func=self.multi_ylabel_wrapper, 
                ylabels=eval_ylabels, 
                ylabel_var_name=ylabel_var_name,
                eval_func=eval_func, 
                grp_name=group_name,
                **kwargs
            )
        else:
            sub_perf_res = self.multi_subset_wrapper(
                condition_dict=condition_dict,
                subset_var_name=subset_var_name,
                func=self.multi_ylabel_wrapper, 
                ylabels=eval_ylabels, 
                ylabel_var_name=ylabel_var_name,
                eval_func=eval_func, 
                **kwargs
            )

        sub_perf_res = sub_perf_res.copy()
        sub_perf_res[var_name] = group_name
        fnl_perf_res.append(sub_perf_res)

    fnl_perf_res = pd.concat(fnl_perf_res)

    if 'grp_name' in parameters:
        if all(x in fnl_perf_res.columns for x in grp_namelist):
            fnl_perf_res[value_name] = fnl_perf_res[grp_namelist].bfill(axis=1).iloc[:, 0]
            fnl_perf_res = fnl_perf_res.drop(columns=grp_namelist)

    return fnl_perf_res

cross_perf_eval

cross_perf_eval(bad_list: List[str], scr_list: List[str], data: Optional[DataFrame] = None, eval_metric: List[str] = None, melt: bool = True) -> DataFrame

Cross-evaluate model performance across multiple bad flags and scores.

This method calculates performance metrics for all combinations of bad flags and scores, useful for comparing model behavior across different definitions of "bad" outcomes.

参数:

名称 类型 描述 默认
bad_list list

List of bad/negative outcome column names.

必需
scr_list list

List of score column names.

必需
data DataFrame

Data to use. If None, uses self.data.

None
eval_metric list

Metrics to extract. If None, uses ['AUC', 'KS', 'N'].

None
melt bool

Whether to melt the pivot table. Default is True.

True

返回:

类型 描述
DataFrame

Cross performance evaluation results.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def cross_perf_eval(
    self,
    bad_list: List[str],
    scr_list: List[str],
    data: Optional[pd.DataFrame] = None,
    eval_metric: List[str] = None,
    melt: bool = True
) -> pd.DataFrame:
    """
    Cross-evaluate model performance across multiple bad flags and scores.

    This method calculates performance metrics for all combinations of
    bad flags and scores, useful for comparing model behavior across
    different definitions of "bad" outcomes.

    Parameters
    ----------
    bad_list : list
        List of bad/negative outcome column names.
    scr_list : list
        List of score column names.
    data : pandas.DataFrame, optional
        Data to use. If None, uses self.data.
    eval_metric : list, optional
        Metrics to extract. If None, uses ['AUC', 'KS', 'N'].
    melt : bool, optional
        Whether to melt the pivot table. Default is True.

    Returns
    -------
    pandas.DataFrame
        Cross performance evaluation results.
    """
    if eval_metric is None:
        eval_metric = ['AUC', 'KS', 'N']

    if data is None:
        data = self.data.copy()

    score_query = " and ".join([s + " > 0" for s in scr_list])
    bad_query = " and ".join([s + " == " + s for s in bad_list])

    data = data.query(bad_query)
    data = data.query(score_query)

    if data.shape[0] > self.nbins:
        fnl_res = []
        for bad in bad_list:
            for scr in scr_list:
                perf = get_perf_summary(
                    None,
                    None,
                    data,
                    tgt_name=bad,
                    scr_name=scr,
                    to_show=False,
                    display=False,
                    dist_bins=self.nbins,
                    pct_bins=self.nbins,
                    precision=self.precision,
                    min_bin_prop=self.min_bin_prop,
                    equal_freq=True
                )
                perf = perf.copy()
                perf['eval_ylabel'] = bad
                perf['score_name'] = scr
                fnl_res.append(perf)

        fnl_res = pd.concat(fnl_res)

        summary = fnl_res.pivot_table(index="score_name", columns='eval_ylabel', values=eval_metric)
        summary.columns = summary.columns.map(lambda x: f"{x[0]}_{x[1]}")

        if melt:
            summary = summary.reset_index(drop=False).melt(id_vars=['score_name'])

        return summary

    return pd.DataFrame(columns=['eval_ylabel', 'score_name'])

run_variable_analysis_summary

run_variable_analysis_summary(varlist: List[str], data: Optional[DataFrame] = None, dep: Optional[str] = None, nbins: Optional[int] = None, equal_freq: Optional[bool] = None, min_bin_prop: Optional[float] = None, precision: Optional[int] = None, chi2_method: Optional[bool] = None, chi2_p: Optional[float] = None, init_equi_bins: Optional[int] = None, tree_binning: Optional[bool] = None, include_missing: Optional[bool] = None, missing_rate_ref: Optional[Union[int, float]] = None, seed: Optional[int] = None, spec_values=[]) -> DataFrame

Run comprehensive variable analysis and generate summary report.

This method performs binning analysis on specified variables, calculating metrics like IV (Information Value) and chi-square statistics.

参数:

名称 类型 描述 默认
varlist list

List of variable names to analyze.

必需
data DataFrame

Data to use. If None, uses self.data.

None
dep str

Target variable name. If None, uses self.dep.

None
nbins int

Number of bins. If None, uses self.nbins.

None
equal_freq bool

Use equal frequency binning. If None, uses self.equal_freq.

None
min_bin_prop float

Minimum bin proportion. If None, uses self.min_bin_prop.

None
precision int

Decimal precision. If None, uses self.precision.

None
chi2_method bool

Use chi-square method. If None, uses self.chi2_method.

None
chi2_p float

Chi-square p-value. If None, uses self.chi2_p.

None
init_equi_bins int

Initial equal bins. If None, uses self.init_equi_bins.

None
tree_binning bool

Use tree binning. If None, uses self.tree_binning.

None
include_missing bool

Include missing values. If None, uses self.include_missing.

None
missing_rate_ref int or float

Missing reference. If None, uses self.missing_rate_ref.

None
seed int

Random seed. If None, uses self.seed.

None

返回:

类型 描述
DataFrame

Variable analysis summary with IV and other metrics.

源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def run_variable_analysis_summary(
    self,
    varlist: List[str],
    data: Optional[pd.DataFrame] = None,
    dep: Optional[str] = None,
    nbins: Optional[int] = None,
    equal_freq: Optional[bool] = None,
    min_bin_prop: Optional[float] = None,
    precision: Optional[int] = None,
    chi2_method: Optional[bool] = None,
    chi2_p: Optional[float] = None,
    init_equi_bins: Optional[int] = None,
    tree_binning: Optional[bool] = None,
    include_missing: Optional[bool] = None,
    missing_rate_ref: Optional[Union[int, float]] = None,
    seed: Optional[int] = None,
    spec_values = []
) -> pd.DataFrame:
    """
    Run comprehensive variable analysis and generate summary report.

    This method performs binning analysis on specified variables,
    calculating metrics like IV (Information Value) and chi-square statistics.

    Parameters
    ----------
    varlist : list
        List of variable names to analyze.
    data : pandas.DataFrame, optional
        Data to use. If None, uses self.data.
    dep : str, optional
        Target variable name. If None, uses self.dep.
    nbins : int, optional
        Number of bins. If None, uses self.nbins.
    equal_freq : bool, optional
        Use equal frequency binning. If None, uses self.equal_freq.
    min_bin_prop : float, optional
        Minimum bin proportion. If None, uses self.min_bin_prop.
    precision : int, optional
        Decimal precision. If None, uses self.precision.
    chi2_method : bool, optional
        Use chi-square method. If None, uses self.chi2_method.
    chi2_p : float, optional
        Chi-square p-value. If None, uses self.chi2_p.
    init_equi_bins : int, optional
        Initial equal bins. If None, uses self.init_equi_bins.
    tree_binning : bool, optional
        Use tree binning. If None, uses self.tree_binning.
    include_missing : bool, optional
        Include missing values. If None, uses self.include_missing.
    missing_rate_ref : int or float, optional
        Missing reference. If None, uses self.missing_rate_ref.
    seed : int, optional
        Random seed. If None, uses self.seed.

    Returns
    -------
    pandas.DataFrame
        Variable analysis summary with IV and other metrics.
    """
    if data is None:
        data = self.data.copy()

    if dep is None:
        dep = self.dep

    nbins = nbins if nbins is not None else self.nbins
    equal_freq = equal_freq if equal_freq is not None else self.equal_freq
    min_bin_prop = min_bin_prop if min_bin_prop is not None else self.min_bin_prop
    precision = precision if precision is not None else self.precision
    chi2_method = chi2_method if chi2_method is not None else self.chi2_method
    chi2_p = chi2_p if chi2_p is not None else self.chi2_p
    init_equi_bins = init_equi_bins if init_equi_bins is not None else self.init_equi_bins
    tree_binning = tree_binning if tree_binning is not None else self.tree_binning
    include_missing = include_missing if include_missing is not None else self.include_missing
    missing_rate_ref = missing_rate_ref if missing_rate_ref is not None else self.missing_rate_ref
    seed = seed if seed is not None else self.seed

    from Modeling_Tool.Feature.Feature_Insights import VarExtractionInsights

    varInsights = VarExtractionInsights(
        data=data,
        dep=dep,
        plot_path=None,
        nbins=nbins,
        equal_freq=equal_freq,
        min_bin_prop=min_bin_prop,
        precision=precision,
        chi2_method=chi2_method,
        chi2_p=chi2_p,
        init_equi_bins=init_equi_bins,
        tree_binning=tree_binning,
        include_missing=include_missing,
        seed=seed,
        missing_rate_ref=missing_rate_ref,
        spec_values=spec_values
    )

    data = data.copy()
    for var in varlist:
        data[var] = data[var].fillna(missing_rate_ref)

    var_summary = varInsights.get_var_analysis_report(data=data, varlist=varlist, dep=dep, iv_cut=0)

    return var_summary

pipe

pipe(data=None)

返回一个 EvaluationPipeline 对象,用于链式分组和子集评估。

参数:

名称 类型 描述 默认
data DataFrame

指定数据,默认使用 self.data

None

返回:

类型 描述
EvaluationPipeline
源代码位于: Modeling_Tool/Eval/Evaluation_Tool.py
def pipe(self, data=None):
        """
        返回一个 EvaluationPipeline 对象,用于链式分组和子集评估。

        Parameters
        ----------
        data : pd.DataFrame, optional
            指定数据,默认使用 self.data

        Returns
        -------
        EvaluationPipeline
        """
        return EvaluationPipeline(self, data)

单/多模型绘图 — evaluate_model

evaluate_model

calc_pr

calc_pr(y_true, y_score, sample_weight=None)

计算P-R曲线相关统计量. 基于sklearn.metrics.precision_recall_curve

参数:

名称 类型 描述 默认
y_true

实际样本标签序列, 只接受0-1

必需
y_score

预测概率值序列

必需

返回:

名称 类型 描述
pr_df DataFrame

PR相关的Precision、Recall、Thresholds等统计量数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def calc_pr(y_true, y_score, sample_weight=None):
    """计算P-R曲线相关统计量.
    基于sklearn.metrics.precision_recall_curve

    Parameters
    ----------
    y_true: array like
        实际样本标签序列, 只接受0-1
    y_score: array like
        预测概率值序列

    Returns
    -------
    pr_df: pandas.DataFrame
        PR相关的Precision、Recall、Thresholds等统计量数据集
    """
    if sample_weight is not None:
        return _weighted_eval.calc_pr(y_true, y_score, sample_weight=sample_weight)

    pr_df = pd.DataFrame(precision_recall_curve(y_true, y_score)).T
    pr_df.columns = ['precision', 'recall', 'thresholds']
    pr_df['thresholds_percentile'] = [100 * np.mean(y_score <= x) for x in pr_df['thresholds']] 

    return pr_df

summarize_pr

summarize_pr(pr_df)

统计P-R曲线信息. 统计量如下: 1.平衡点(Break-Even Point, 简称BEP)阈值及对应Precision、Recall等统计量

参数:

名称 类型 描述 默认
pr_df

PR相关的Precision、Recall、Thresholds等统计量数据集

必需

返回:

名称 类型 描述
pr_info dict

P-R曲线统计信息字典

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def summarize_pr(pr_df):
    """统计P-R曲线信息.
    统计量如下:
    1.平衡点(Break-Even Point, 简称BEP)阈值及对应Precision、Recall等统计量

    Parameters
    ----------
    pr_df: pandas.DataFrame
        PR相关的Precision、Recall、Thresholds等统计量数据集

    Returns
    -------
    pr_info: dict
        P-R曲线统计信息字典
    """
    equalind = np.argmin(abs(pr_df['precision'] - pr_df['recall']))
    pr_info = {
        'bep_index': equalind, 
        'bep_threshold': pr_df['thresholds'][equalind], 
        'bep_precision': pr_df['precision'][equalind],
        'bep_recall': pr_df['recall'][equalind],
        }

    return pr_info

plot_pr_curve

plot_pr_curve(pr_dfs, square_figsize=8, to_show=True, save_path=None)

绘制P-R曲线图.

参数:

名称 类型 描述 默认
pr_dfs

单个或多个Score名下的PR数据集. 键值对格式为: {name: pr_df}

必需
square_figsize

正方形图边英寸. 默认值为8

8
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_pr_curve(pr_dfs,  square_figsize=8, to_show=True, save_path=None):
    """绘制P-R曲线图.

    Parameters
    ----------
    pr_dfs: Dict
        单个或多个Score名下的PR数据集. 键值对格式为: {name: pr_df}
    square_figsize: float
        正方形图边英寸. 默认值为8
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('P-R Curve', fontsize=20, fontweight='bold') #, findfont=zhfont)
    ax = plt.subplot(1,1,1)

    models = list(pr_dfs.keys())
    if len(models) == 1:
        pr_df = pr_dfs[models[0]]
        __plot_single_pr_axes(pr_df, ax)
    else:
        __plot_multi_pr_axes(pr_dfs, ax)
    if to_show:
        plt.show()
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    plt.close()

calc_roc

calc_roc(y_true, y_score, sample_weight=None)

计算ROC曲线相关统计量. 基于sklearn.metrics.roc_curve

参数:

名称 类型 描述 默认
y_true

实际样本标签序列, 只接受0-1

必需
y_score

预测概率值序列

必需

返回:

名称 类型 描述
roc_df DataFrame

ROC相关的TPR、FPR、Thresholds等统计量数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def calc_roc(y_true, y_score, sample_weight=None):
    """计算ROC曲线相关统计量.
    基于sklearn.metrics.roc_curve

    Parameters
    ----------
    y_true: array like
        实际样本标签序列, 只接受0-1
    y_score: array like
        预测概率值序列

    Returns
    -------
    roc_df: pandas.DataFrame
        ROC相关的TPR、FPR、Thresholds等统计量数据集
    """

    if sample_weight is not None:
        return _weighted_eval.calc_roc(y_true, y_score, sample_weight=sample_weight)

    # 移除无效值
    mask = np.isfinite(y_score) & np.isfinite(y_true)
    y_true_clean = np.array(y_true)[mask]
    y_score_clean = np.array(y_score)[mask]

    if len(y_true_clean) == 0:
        # 返回一个空的 ROC DataFrame
        return pd.DataFrame(columns=['fpr', 'tpr', 'thresholds', 'thresholds_percentile'])

    roc_df = pd.DataFrame(roc_curve(y_true_clean, y_score_clean)).T
    roc_df.columns = ['fpr', 'tpr', 'thresholds']
    roc_df['thresholds_percentile'] = [100 * np.mean(y_score_clean <= x) for x in roc_df['thresholds']]

    return roc_df

summarize_roc

summarize_roc(roc_df)

统计ROC曲线信息. 统计量如下: 1. AUC 2. KS及其对应阈值

参数:

名称 类型 描述 默认
roc_df

ROC相关的TPR、FPR、Thresholds等统计量数据集

必需

返回:

名称 类型 描述
roc_info dict

ROC曲线统计信息字典

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def summarize_roc(roc_df):
    """统计ROC曲线信息.
    统计量如下: 
    1. AUC
    2. KS及其对应阈值

    Parameters
    ----------
    roc_df: pandas.DataFrame
        ROC相关的TPR、FPR、Thresholds等统计量数据集

    Returns
    -------
    roc_info: dict
        ROC曲线统计信息字典
    """

    if roc_df.empty:
        return {'auc': np.nan, 'ks_index': np.nan, 'ks_threshold': np.nan, 'ks': np.nan}

    f = roc_df['tpr'] - roc_df['fpr']
    roc_info = {
        'auc': auc(roc_df['fpr'], roc_df['tpr']),
        'ks_index': np.argmax(f),
        'ks_threshold': roc_df['thresholds'][np.argmax(f)],
        'ks': max(abs(f)),
        }

    return roc_info

plot_ks_curve

plot_ks_curve(roc_df, square_figsize=8, to_show=True, save_path=None)

绘制KS曲线图. 只能绘制单个Score的KS曲线.

参数:

名称 类型 描述 默认
roc_df

ROC相关的TPR、FPR、Thresholds等统计量数据集

必需
square_figsize

正方形图边英寸. 默认值为8

8
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_ks_curve(roc_df, square_figsize=8, to_show=True, save_path=None):
    """绘制KS曲线图.
    只能绘制单个Score的KS曲线.

    Parameters
    ----------
    roc_df: pandas.DataFrame
        ROC相关的TPR、FPR、Thresholds等统计量数据集
    square_figsize: float
        正方形图边英寸. 默认值为8
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('KS Curve', fontsize=20, fontweight='bold') #, findfont=zhfont)
    ax = plt.subplot(1,1,1)
    __plot_ks_axes(roc_df, ax)
    if to_show:
        plt.show()
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    plt.close()

plot_roc_curve

plot_roc_curve(roc_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None)

绘制ROC曲线图. 可绘制单个或多个Score的曲线.

参数:

名称 类型 描述 默认
roc_dfs

单个或多个Score名下的ROC相关统计量字典 键值对格式为: {name: roc_df}

必需
square_figsize

正方形图边英寸. 默认值为8

8
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_roc_curve(roc_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None):
    """绘制ROC曲线图.
    可绘制单个或多个Score的曲线.

    Parameters
    ----------
    roc_dfs: dict
        单个或多个Score名下的ROC相关统计量字典
        键值对格式为: {name: roc_df}
    square_figsize: float
        正方形图边英寸. 默认值为8
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('ROC Curve', fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight'])
    ax = plt.subplot(1,1,1)
    models = list(roc_dfs.keys())
    if len(models) == 1:
        roc_df = roc_dfs[models[0]]
        __plot_single_roc_axes(roc_df, ax, fontdicts)
    else:
        __plot_multi_roc_axes(roc_dfs, ax, fontdicts)
    if to_show:
        plt.show()
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    plt.close()

plot_kde_curve

plot_kde_curve(y_true, y_score_dict, bins=20, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None)

绘制Score核密度估计(kernel density estimate, 简称KDE)曲线.

参数:

名称 类型 描述 默认
y_true

实际样本标签序列, 只接受0-1

必需
y_score_dict

单个或多个Score序列字典. 键值对格式为: {name: Score}

必需
bins

分组数

20
square_figsize

正方形图边英寸. 默认值为8

8
fontdicts

绘图相关字体字典

fontdicts['main']
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_kde_curve(y_true, y_score_dict, bins=20, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None):
    """绘制Score核密度估计(kernel density estimate, 简称KDE)曲线.

    Parameters
    ----------
    y_true: array like
        实际样本标签序列, 只接受0-1
    y_score_dict: dict
        单个或多个Score序列字典. 键值对格式为: {name: Score}
    bins: int
        分组数
    square_figsize: float
        正方形图边英寸. 默认值为8
    fontdicts: dict
        绘图相关字体字典
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    y_true = np.array(y_true)
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('Score KDE Curve', fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)
    ax = plt.subplot(1,1,1)
    models = list(y_score_dict.keys())
    if len(models) == 1:
        y_score = y_score_dict[models[0]]
        y_score = np.array(y_score)
        __plot_single_kde_axes(y_true, y_score, bins, ax, fontdicts)
    else:
        __plot_multi_kde_axes(y_true, y_score_dict, bins, ax, fontdicts)
    if to_show:
        plt.show()
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    plt.close()

calc_equid_dist

calc_equid_dist(y_true, y_score, y_group=None, bins=10, sample_weight=None)

将Score等距分组, 计算各组统计量.

参数:

名称 类型 描述 默认
y_true

实际样本标签序列, 只接受0-1

必需
y_score

预测概率值序列

必需
y_group

数据组别序列. 默认为None, 即无组别

None
bins

分组数

10

返回:

名称 类型 描述
dist_df DataFrame

等距分组后各组统计量数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def calc_equid_dist(y_true, y_score, y_group=None, bins=10, sample_weight=None):
    """将Score等距分组, 计算各组统计量.

    Parameters
    ----------
    y_true: array like
        实际样本标签序列, 只接受0-1
    y_score: array like
        预测概率值序列
    y_group: array like
        数据组别序列. 默认为None, 即无组别
    bins: int
        分组数

    Returns
    -------
    dist_df: pandas.DataFrame
        等距分组后各组统计量数据集
    """
    if sample_weight is not None:
        return _weighted_eval.calc_equid_dist(y_true, y_score, bins=bins, sample_weight=sample_weight)

    y_true = np.array(y_true)
    y_score = np.array(y_score)

    min_score = __calc_digit_min(np.min(y_score))
    max_score = __calc_digit_max(np.max(y_score))
    step = (max_score - min_score) / bins

    binvalues = list(np.arange(min_score, max_score, step))
    labels = binvalues[1:]
    labels.append(max_score)
    binvalues.append(np.inf)
    thresholds = pd.cut(x=y_score, bins=binvalues, right=False, labels=labels)

    if y_group is not None:
        df = pd.DataFrame({'y_true': y_true, 'y_score': y_score, 'thresholds': thresholds, 'y_group': y_group})
    else:
        df = pd.DataFrame({'y_true': y_true, 'y_score': y_score, 'thresholds': thresholds})
    dist_df = __agg(df)

    return dist_df

calc_equid_pct

calc_equid_pct(y_true, y_score, y_group=None, bins=10, ascending=True, sample_weight=None)

将Score严格的等分分组, 计算各组统计量.

参数:

名称 类型 描述 默认
y_true

实际样本标签序列, 只接受0-1

必需
y_score

预测概率值序列

必需
y_group

数据组别序列. 默认为None, 即无组别

None
bins

分组数

10
ascending

y_score是否按升序排序, 默认为True

True

返回:

名称 类型 描述
pct_df DataFrame

等分分组后各组统计量数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def calc_equid_pct(y_true, y_score, y_group=None, bins=10, ascending=True, sample_weight=None):
    """将Score严格的等分分组, 计算各组统计量.

    Parameters
    ----------
    y_true: array like
        实际样本标签序列, 只接受0-1
    y_score: array like
        预测概率值序列
    y_group: array like
        数据组别序列. 默认为None, 即无组别
    bins: int
        分组数
    ascending: bool
        y_score是否按升序排序, 默认为True

    Returns
    -------
    pct_df: pandas.DataFrame
        等分分组后各组统计量数据集
    """
    if sample_weight is not None:
        return _weighted_eval.calc_equid_pct(y_true, y_score, bins=bins, sample_weight=sample_weight)

    y_true = np.array(y_true)
    y_score = np.array(y_score)
    size = len(y_true)
    binsize = int(size / bins) # 向下取整
    indices = np.argsort(y_score) if ascending else np.argsort(y_score)[::-1] 

    thresholds = np.array([0]*size)
    thresholds_percentile = [100 * (i+1) / bins for i in range(bins)]
    for i in range(bins):
        s = i*binsize
        e = (i+1)*binsize if i < bins-1 else np.max([size, (i+1)*binsize]) # 严格排序
        thresholds[indices[s:e]] = thresholds_percentile[i]
    if bool(y_group):
        df = pd.DataFrame({'y_true': y_true, 'y_score': y_score, 'y_group': y_group, 'thresholds': thresholds})
    else:
        df = pd.DataFrame({'y_true': y_true, 'y_score': y_score, 'thresholds': thresholds})
    pct_df = __agg(df)

    avg_true = np.mean(y_true)
    pct_df['lift'] = [x / avg_true for x in pct_df['cumavg_true']]
    pct_df['gain'] = np.cumsum(pct_df['capture_rate'])

    return pct_df

calc_fixed_pct

calc_fixed_pct(y_true, y_score, y_group=None, bin_edges=None, ascending=True, sample_weight=None)

使用固定Score边界分组, 计算各组统计量.

参数:

名称 类型 描述 默认
y_true

实际样本标签序列, 只接受0-1

必需
y_score

预测概率值序列

必需
y_group

数据组别序列. 默认为None, 即无组别

None
bin_edges

固定分箱边界, 通常来自benchmark数据集

None
ascending

y_score是否按升序分箱. 默认为True

True

返回:

名称 类型 描述
pct_df DataFrame

固定分箱后各组统计量数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def calc_fixed_pct(y_true, y_score, y_group=None, bin_edges=None, ascending=True, sample_weight=None):
    """使用固定Score边界分组, 计算各组统计量.

    Parameters
    ----------
    y_true: array like
        实际样本标签序列, 只接受0-1
    y_score: array like
        预测概率值序列
    y_group: array like
        数据组别序列. 默认为None, 即无组别
    bin_edges: array like
        固定分箱边界, 通常来自benchmark数据集
    ascending: bool
        y_score是否按升序分箱. 默认为True

    Returns
    -------
    pct_df: pandas.DataFrame
        固定分箱后各组统计量数据集
    """
    if sample_weight is not None:
        return _weighted_eval.calc_fixed_pct(y_true, y_score, sample_weight=sample_weight)

    if bin_edges is None:
        raise ValueError("bin_edges cannot be None when using fixed pct bins.")

    y_true = np.array(y_true)
    y_score = np.array(y_score)
    bin_edges = sorted([np.inf if str(x).lower() == 'inf' else -np.inf if str(x).lower() == '-inf' else x for x in bin_edges])
    n_bins = len(bin_edges) - 1
    labels = [100 * (i + 1) / n_bins for i in range(n_bins)]
    if not ascending:
        labels = labels[::-1]

    thresholds = pd.cut(
        x=y_score,
        bins=bin_edges,
        right=True,
        include_lowest=False,
        labels=labels
    )

    if y_group is not None:
        df = pd.DataFrame({'y_true': y_true, 'y_score': y_score, 'y_group': y_group, 'thresholds': thresholds})
    else:
        df = pd.DataFrame({'y_true': y_true, 'y_score': y_score, 'thresholds': thresholds})

    pct_df = __agg(df)

    avg_true = np.mean(y_true)
    pct_df['lift'] = [x / avg_true for x in pct_df['cumavg_true']]
    pct_df['gain'] = np.cumsum(pct_df['capture_rate'])

    return pct_df

summarize_pct

summarize_pct(pct_df, ascending=True)

统计等分分组信息.

参数:

名称 类型 描述 默认
pct_df

等分分组后各组统计量数据集

必需

返回:

名称 类型 描述
pct_info dict

等分分组统计信息字典

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def summarize_pct(pct_df, ascending=True):
    """统计等分分组信息.

    Parameters
    ----------
    pct_df: pandas.DataFrame
        等分分组后各组统计量数据集

    Returns
    -------
    pct_info: dict
        等分分组统计信息字典
    """

    if pct_df.empty:
        return {
            'pct_bins': np.nan,
            'pct_interval': np.nan,
            'pct_top_avgTrue': np.nan,
            'pct_btm_avgTrue': np.nan,
        }

    bins = pct_df.shape[0]
    interval = 100 / pct_df.shape[0]

    if ascending:
        pct_info = {
            'pct_bins': bins,
            'pct_interval': interval,
            'pct_top_avgTrue'.format(interval):  pct_df['avg_true'].iloc[bins-1],
            'pct_btm_avgTrue'.format(interval):  pct_df['avg_true'].iloc[0],
        }
    else:
        pct_info = {
            'pct_bins': bins,
            'pct_interval': interval,
            'pct_top_captureRate'.format(interval):  pct_df['capture_rate'].iloc[bins-1],
            'pct_btm_captureRate'.format(interval):  pct_df['capture_rate'].iloc[0],
        }

    return pct_info

plot_dist_curve

plot_dist_curve(dist_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None)

绘制Score分布曲线图.

参数:

名称 类型 描述 默认
dist_dfs

单个或多个Score名下的相同组数, 等距分组后各组统计量数据集. 键值对格式为: {name: dist_df}

必需
square_figsize

正方形图边英寸. 默认值为8

8
fontdicts

绘图相关字体字典

fontdicts['main']
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_dist_curve(dist_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None):
    """绘制Score分布曲线图.

    Parameters
    ----------
    dist_dfs: dict
        单个或多个Score名下的相同组数, 等距分组后各组统计量数据集. 键值对格式为: {name: dist_df}
    square_figsize: float
        正方形图边英寸. 默认值为8
    fontdicts: dict
        绘图相关字体字典
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('Score Distribution Curve', fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)
    ax = plt.subplot(1,1,1)
    models = list(dist_dfs.keys())
    if len(models) == 1:
        dist_df = dist_dfs[models[0]]
        if 'y_group' in dist_df.columns:
            __plot_single_stack_dist_axes(dist_df, ax, fontdicts)
        else:
            __plot_single_dist_axes(dist_df, ax, fontdicts)
    else:
        __plot_multi_dist_axes(dist_dfs, ax, fontdicts)
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    if to_show:
        plt.show()
    plt.close()

plot_cumdist_curve

plot_cumdist_curve(dist_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None)

绘制Score分布曲线图.

参数:

名称 类型 描述 默认
dist_dfs

单个或多个Score名下的相同组数, 等距分组后各组统计量数据集. 键值对格式为: {name: dist_df}

必需
square_figsize

正方形图边英寸. 默认值为8

8
fontdicts

绘图相关字体字典

fontdicts['main']
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_cumdist_curve(dist_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None):
    """绘制Score分布曲线图.

    Parameters
    ----------
    dist_dfs: dict
        单个或多个Score名下的相同组数, 等距分组后各组统计量数据集. 键值对格式为: {name: dist_df}
    square_figsize: float
        正方形图边英寸. 默认值为8
    fontdicts: dict
        绘图相关字体字典
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('Score Cumulative Distribution Curve', fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)
    ax = plt.subplot(1,1,1)
    models = list(dist_dfs.keys())
    if len(models) == 1:
        dist_df = dist_dfs[models[0]]
        if 'y_group' in dist_df.columns:
            __plot_single_stack_cumdist_axes(dist_df, ax, fontdicts)
        else:
            __plot_single_cumdist_axes(dist_df, ax, fontdicts)
    else:
        __plot_multi_cumdist_axes(dist_dfs, ax, fontdicts)
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    if to_show:
        plt.show()
    plt.close()

plot_pct_curve

plot_pct_curve(pct_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None)

绘制Score分布曲线图.

参数:

名称 类型 描述 默认
pct_dfs

单个或多个Score名下的等分组后各组统计量数据集. 键值对格式为: {name: pct_df}

必需
square_figsize

正方形图边英寸. 默认值为8

8
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_pct_curve(pct_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None):
    """绘制Score分布曲线图.

    Parameters
    ----------
    pct_dfs: dict
        单个或多个Score名下的等分组后各组统计量数据集. 键值对格式为: {name: pct_df}
    square_figsize: float
        正方形图边英寸. 默认值为8
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('Score Percentile Curve', fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)
    ax = plt.subplot(1,1,1)

    models = list(pct_dfs.keys())
    if len(models) == 1:
        pct_df = pct_dfs[models[0]]
        __plot_single_pct_axes(pct_df, ax, fontdicts)
    else:
        __plot_multi_pct_axes(pct_dfs, ax, fontdicts)

    if to_show:
        plt.show()
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    plt.close()

plot_cumpct_curve

plot_cumpct_curve(pct_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None)

绘制Score分布曲线图.

参数:

名称 类型 描述 默认
pct_dfs

单个或多个Score名下的等分组后各组统计量数据集. 键值对格式为: {name: pct_df}

必需
square_figsize

正方形图边英寸. 默认值为8

8
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_cumpct_curve(pct_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None):
    """绘制Score分布曲线图.

    Parameters
    ----------
    pct_dfs: dict
        单个或多个Score名下的等分组后各组统计量数据集. 键值对格式为: {name: pct_df}
    square_figsize: float
        正方形图边英寸. 默认值为8
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('Score Percentile Curve', fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)
    ax = plt.subplot(1,1,1)

    models = list(pct_dfs.keys())
    if len(models) == 1:
        pct_df = pct_dfs[models[0]]
        __plot_single_cumpct_axes(pct_df, ax, fontdicts)
    else:
        __plot_multi_cumpct_axes(pct_dfs, ax, fontdicts)

    if to_show:
        plt.show()
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    plt.close()

plot_gain_curve

plot_gain_curve(pct_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None)

绘制Score分布曲线图.

参数:

名称 类型 描述 默认
pct_dfs

单个或多个Score名下的等分组后各组统计量数据集. 键值对格式为: {name: pct_df}

必需
square_figsize

正方形图边英寸. 默认值为8

8
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None
源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def plot_gain_curve(pct_dfs, square_figsize=8, fontdicts=fontdicts['main'], to_show=True, save_path=None):
    """绘制Score分布曲线图.

    Parameters
    ----------
    pct_dfs: dict
        单个或多个Score名下的等分组后各组统计量数据集. 键值对格式为: {name: pct_df}
    square_figsize: float
        正方形图边英寸. 默认值为8
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存
    """
    plt.figure(figsize=(square_figsize, square_figsize))
    plt.suptitle('Gain Curve', fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight'])  #, findfont=zhfont)
    ax = plt.subplot(1,1,1)

    models = list(pct_dfs.keys())
    if len(models) == 1:
        pct_df = pct_dfs[models[0]]
        __plot_single_gain_axes(pct_df, ax, fontdicts)
    else:
        __plot_multi_gain_axes(pct_dfs, ax, fontdicts)
    ax.legend(loc=2, fontsize=fontdicts['legend']['size'])

    if to_show:
        plt.show()
    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')
    plt.close()

evaluate_performance

evaluate_performance(datasets, dist_bins=20, pct_bins=10, square_figsize=5, fontdicts=fontdicts['sub'], to_show=True, save_path=None, gains_table=True, equal_freq=True, pct_bin_edges=None, sample_weight=None)

绘制单模型预测效果评价图.

参数:

名称 类型 描述 默认
datasets

数据集字典. 键值对格式为: {dataname: {'y_true': y_true, 'y_score': y_score}}

必需
dist_bins

等距分组数. 默认值为20

20
pct_bins

等分分组数. 默认值为10

10
square_figsize

正方形图边英寸. 默认值为8

5
fontdicts

绘图相关字体字典. 默认值为fontdicts['sub']

fontdicts['sub']
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None

返回:

名称 类型 描述
result_df DataFrame

模型评价指标汇总数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def evaluate_performance(datasets, dist_bins=20, pct_bins=10, square_figsize=5, fontdicts=fontdicts['sub'], to_show=True, save_path=None, gains_table = True, equal_freq = True, pct_bin_edges = None, sample_weight=None):
    """绘制单模型预测效果评价图.

    Parameters
    ----------
    datasets: pandas.DataFrame
        数据集字典. 键值对格式为: {dataname: {'y_true': y_true, 'y_score': y_score}}
    dist_bins: int
        等距分组数. 默认值为20
    pct_bins: int
        等分分组数. 默认值为10
    square_figsize: float
        正方形图边英寸. 默认值为8
    fontdicts: dict
        绘图相关字体字典. 默认值为fontdicts['sub']
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存

    Returns
    -------
    result_df: pandas.DataFrame
        模型评价指标汇总数据集
    """
    if pct_bin_edges is not None:
        pct_bin_edges = list(pct_bin_edges)

    # 检查是否有足够数据
    for d, data_dict in datasets.items():
        if len(data_dict['y_true']) < 2:
            # 返回一个包含默认列的空 DataFrame
            return pd.DataFrame()

    datas = list(datasets.keys())
    nrow = len(datas)
    ncol = 4
    width = ncol * (square_figsize+1)
    height = nrow * (square_figsize+1)
    plt.figure(figsize=(width, height))
    plt.subplots_adjust(top=1-1/height, wspace=0.2, hspace=0.2)
    title = 'Model Evaluation (Row Dataset: {0})'.format(', '.join(datas)) if nrow > 1 else 'Model Evaluation'
    plt.suptitle(title, fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)

    result = {}
    for i in range(len(datas)):
        d = datas[i]
        y_true = datasets[d]['y_true']
        y_score = datasets[d]['y_score']
        dataset_weight = datasets[d].get('sample_weight', sample_weight)
        result.update({d: __evaluate_performance(y_true, y_score, nrow, ncol, i, dist_bins, pct_bins, fontdicts, gains_table, equal_freq, pct_bin_edges, sample_weight=dataset_weight)})

    result_df = pd.DataFrame.from_dict(result, orient='index').reset_index()

    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')

    if to_show:
        plt.show()

    plt.close('all')

    return result_df

evaluate_distribution

evaluate_distribution(datasets, dist_bins=10, square_figsize=5, fontdicts=fontdicts['sub'], toplot=True, save_path=None)

绘制单模型在多样本集上模型分分布.

参数:

名称 类型 描述 默认
datasets

数据集字典. 键值对格式为: {dataname: {'y_true': y_true, 'y_score': y_score, 'y_group': y_group}}

必需
dist_bins

等距分组数. 默认值为20

10
square_figsize

正方形图边英寸. 默认值为8

5
fontdicts

绘图相关字体字典. 默认值为fontdicts['sub']

fontdicts['sub']
toplot

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None

返回:

名称 类型 描述
result_df DataFrame

模型评价指标汇总数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def evaluate_distribution(datasets, dist_bins=10, square_figsize=5, fontdicts=fontdicts['sub'], toplot=True, save_path=None):
    """绘制单模型在多样本集上模型分分布.

    Parameters
    ----------
    datasets: pandas.DataFrame
        数据集字典. 键值对格式为: {dataname: {'y_true': y_true, 'y_score': y_score, 'y_group': y_group}}
    dist_bins: int
        等距分组数. 默认值为20
    square_figsize: float
        正方形图边英寸. 默认值为8
    fontdicts: dict
        绘图相关字体字典. 默认值为fontdicts['sub']
    toplot: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存

    Returns
    -------
    result_df: pandas.DataFrame
        模型评价指标汇总数据集
    """
    datas = list(datasets.keys())
    nrow = len(datas)
    ncol = 2
    width = ncol * (square_figsize+1)
    height = nrow * (square_figsize+1)
    plt.figure(figsize=(width, height))
    plt.subplots_adjust(top=1-1/height, wspace=0.2, hspace=0.2)
    if nrow > 1:
        title = 'Distribution (Row Dataset: {0})'.format(', '.join(datas))
    else:
        title = 'Distribution'
    plt.suptitle(title, fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)
    for i in range(len(datas)):
        d = datas[i]
        y_true = datasets[d]['y_true']
        y_score = datasets[d]['y_score']
        y_group = datasets[d]['y_group']
        __evaluate_distribution(y_true, y_score, y_group, nrow, ncol, i, dist_bins, fontdicts)
    plt.tight_layout()

    if bool(save_path):
        plt.savefig(save_path)

    if toplot:
        plt.show()

    plt.close('all')

comparison_performance

comparison_performance(datasets, pct_bins=10, square_figsize=5, fontdicts=fontdicts['sub'], to_show=True, save_path=None)

绘制多个模型预测效果对比图.

参数:

名称 类型 描述 默认
datasets

数据集字典. 键值对格式为: {dataname: {'y_true': y_true, 'y_score': y_score, 'y_group': y_group}}

必需
pct_bins

等分分组数. 默认值为10

10
square_figsize

正方形图边英寸. 默认值为5

5
fontdicts

绘图相关字体字典. 默认值为fontdicts['sub']

fontdicts['sub']
to_show

是否展示图片. 默认为True

True
save_path

结果图片存放文件地址. 默认值为None, 即不保存

None

返回:

名称 类型 描述
result_df DataFrame

模型评价指标汇总数据集

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def comparison_performance(datasets, pct_bins=10, square_figsize=5, fontdicts=fontdicts['sub'], to_show=True, save_path=None):
    """绘制多个模型预测效果对比图.

    Parameters
    ----------
    datasets: pandas.DataFrame
        数据集字典. 键值对格式为: {dataname: {'y_true': y_true, 'y_score': y_score, 'y_group': y_group}}
    pct_bins: int
        等分分组数. 默认值为10
    square_figsize: float
        正方形图边英寸. 默认值为5
    fontdicts: dict
        绘图相关字体字典. 默认值为fontdicts['sub']
    to_show: bool
        是否展示图片. 默认为True
    save_path: str
        结果图片存放文件地址. 默认值为None, 即不保存

    Returns
    -------
    result_df: pandas.DataFrame
        模型评价指标汇总数据集
    """
    datas = list(datasets.keys())
    models = list(datasets[datas[0]]['y_score_dict'].keys())
    nrow = len(datas)
    ncol = 3
    width = ncol * (square_figsize+1)
    height = nrow * (square_figsize+1)
    plt.figure(figsize=(width, height))
    plt.subplots_adjust(top=1-1/height, wspace=0.2, hspace=0.2)
    title = title = '{0} Comparison (Row Dataset: {1})'.format(' vs '.join(models), ', '.join(datas)) if nrow > 1 else '{0} Comparison'.format(' vs '.join(models))
    plt.suptitle(title, fontsize=fontdicts['suptitle']['size'], fontweight=fontdicts['suptitle']['weight']) #, findfont=zhfont)

    result_dfs = []
    for i in range(len(datas)):
        d = datas[i]
        y_true = datasets[d]['y_true']
        y_score_dict = datasets[d]['y_score_dict']
        _result = __comparison_performance(y_true, y_score_dict, nrow, ncol, i, pct_bins, fontdicts)
        _result.loc[:, 'dataset'] = d
        result_dfs.append(_result)

    result_df = pd.concat(result_dfs)

    if bool(save_path):
        plt.savefig(save_path, bbox_inches='tight')

    if to_show:
        plt.show()

    plt.close('all')

    return result_df

calc_lift_apt

calc_lift_apt(y_true, y_score, start, stop, step, score_ascending=True, sample_weight=None)

给定Lift取值范围, 求解Lift表.

参数:

名称 类型 描述 默认
y_true

实际样本标签序列, 只接受0-1

必需
y_score

预测概率值序列

必需
start

起始值

必需
stop

终止值

必需
step

步长

必需
score_ascending

y_score为升序序列, 即数值越大y_true=1可能性越大. 默认为True

True

返回:

名称 类型 描述
lift_df DataFrame

Lift表

源代码位于: Modeling_Tool/Eval/evaluate_model.py
@timeit_decorator
def calc_lift_apt(y_true, y_score, start, stop, step, score_ascending=True, sample_weight=None):
    """给定Lift取值范围, 求解Lift表.

    Parameters
    ----------
    y_true: array like
        实际样本标签序列, 只接受0-1
    y_score: array like
        预测概率值序列
    start: numerical
        起始值
    stop: numerical
        终止值
    step: numerical
        步长
    score_ascending: bool
        y_score为升序序列, 即数值越大y_true=1可能性越大. 默认为True

    Returns
    -------
    lift_df: pandas.DataFrame
        Lift表
    """
    if sample_weight is not None:
        weight = np.asarray(sample_weight, dtype=float)
        int_weight = np.rint(weight).astype(int)
        if np.allclose(weight, int_weight) and np.all(int_weight >= 0):
            repeat_idx = np.repeat(np.arange(len(int_weight)), int_weight)
            y_true = np.asarray(y_true)[repeat_idx]
            y_score = np.asarray(y_score)[repeat_idx]
        else:
            return _weighted_eval.calc_lift_apt(
                y_true=y_true,
                y_score=y_score,
                start=start,
                stop=stop,
                step=step,
                sample_weight=sample_weight,
            )

    # 计算初始等分分组数, 在200组与LiftTable长度中取大
    init_bins = np.max([200, int((stop - start + step) / step)])

    # 根据start和stop值, 判断lift的升降(lift_ascending)
    # 升则stop=1, 降则start=1
    if start < 1:
        stop = 1
    elif start >= 1:
        start = 1
    lift_ascending = start < 1
    lift_df = pd.DataFrame({'lift': np.arange(start, stop + step, step)})

    # 根据分数与Y=1的升降关系(score_ascending)与lift的升降(lift_ascending)判断分数分组的升降序
    # 一致则为升、不一致则为降
    ascending = score_ascending ==  lift_ascending
    equid_df = calc_equid_pct(y_true=y_true, y_score=y_score, y_group=None, bins=init_bins, ascending=ascending)

    # 根据分数升降序, 修正上下组限值, 遵循上组限不在内原则
    if ascending:
        equid_df['lower_limit'] = [-np.inf, ] + list(equid_df['min_score'][1:])
        equid_df['upper_limit'] = list(equid_df['min_score'][1:]) + [np.inf, ]
    else:
        equid_df['lower_limit'] = list(equid_df['min_score'][:-1]) + [-np.inf, ]
        equid_df['upper_limit'] = [np.inf, ] + list(equid_df['min_score'][:-1])
    lift_df = pd.merge(lift_df.assign(key=1), equid_df.assign(key=1), how='left', on='key', suffixes=('', '_actual')).drop(columns=['key'])

    # 根据lift的升降(lift_ascending)求解切分数据
    # 升则取不高于lift的最大值, 降则取不低于lift的最小值
    if lift_ascending:
        cond = lift_df['lift'] >= lift_df['lift_actual'] 
    else:
        cond = lift_df['lift'] <= lift_df['lift_actual']
    lift_df = lift_df.loc[cond, ].reset_index(drop=True)
    lift_df = lift_df.sort_values(by=['lift', 'thresholds'], ascending=[lift_ascending, False])
    lift_df = lift_df.groupby(['lift']).first().reset_index()
    lift_df = lift_df.sort_values(by=['lift'], ascending=lift_ascending)

    # 根据分数排序, 升序时取组上限, 降序时取组下限
    if ascending:
        cols = ['lift', 'lift_actual', 'upper_limit', 'cumsum_n', 'cumsum_proportion', 'cumsum_true', 'cumavg_true']
    else:
        cols = ['lift', 'lift_actual', 'lower_limit', 'cumsum_n', 'cumsum_proportion', 'cumsum_true', 'cumavg_true']
    lift_df = lift_df[cols]

    return lift_df