跳转至

Modeling_Tool.Sample

样本管理层 —— 切分、分层、均衡、拒绝推断、分布适配。

样本切分与采样 — Sample_Split

Sample_Split

Sampling classes for credit modeling.

This module provides classes for splitting samples, stratified sampling, and sample balancing for credit model development.

类:

名称 描述
SampleSplitter : Split data into train/test samples.
StratifiedSampler : Stratified sampling with target balance control.
SampleBalancer : Balance samples using various techniques.

示例:

>>> from Modeling_Tool_refactored.sample import SampleSplitter
>>> splitter = SampleSplitter()
>>> train, test = splitter.split(df, 'target', test_size=0.3)

SampleSplitter

Split data into training and testing samples.

This class provides flexible sample splitting with support for stratification, random sampling, and custom split ratios.

参数:

名称 类型 描述 默认
test_size float

Proportion of data for testing (0 to 1).

0.3
random_state int

Random seed for reproducibility.

None
stratify bool

Whether to stratify by target variable.

True

属性:

名称 类型 描述
train_index_ ndarray

Indices for training data.

test_index_ ndarray

Indices for testing data.

方法:

名称 描述
split

Split data into train and test sets.

split_df

Split DataFrame while excluding certain columns.

示例:

>>> splitter = SampleSplitter(test_size=0.2, random_state=42)
>>> train, test = splitter.split(X, y)
>>> print(f"Train size: {len(train)}, Test size: {len(test)}")
源代码位于: Modeling_Tool/Sample/Sample_Split.py
class SampleSplitter:
    """
    Split data into training and testing samples.

    This class provides flexible sample splitting with support for
    stratification, random sampling, and custom split ratios.

    Parameters
    ----------
    test_size : float, default 0.3
        Proportion of data for testing (0 to 1).
    random_state : int, optional
        Random seed for reproducibility.
    stratify : bool, default True
        Whether to stratify by target variable.

    Attributes
    ----------
    train_index_ : numpy.ndarray
        Indices for training data.
    test_index_ : numpy.ndarray
        Indices for testing data.

    Methods
    -------
    split(X, y, test_size=None, stratify=None)
        Split data into train and test sets.
    split_df(df, target, exclude_cols=None)
        Split DataFrame while excluding certain columns.

    Examples
    --------
    >>> splitter = SampleSplitter(test_size=0.2, random_state=42)
    >>> train, test = splitter.split(X, y)
    >>> print(f"Train size: {len(train)}, Test size: {len(test)}")
    """

    def __init__(self, test_size: float = 0.3, 
                 random_state: Optional[int] = None,
                 stratify: bool = True):
        """
        Initialize SampleSplitter.

        Parameters
        ----------
        test_size : float, default 0.3
            Proportion for testing.
        random_state : int, optional
            Random seed.
        stratify : bool, default True
            Whether to stratify.
        """
        self.test_size = test_size
        self.random_state = random_state
        self.stratify = stratify
        self.train_index_ = None
        self.test_index_ = None

    def split(self, X: Union[pd.DataFrame, np.ndarray],
              y: Union[pd.Series, np.ndarray],
              test_size: Optional[float] = None,
              stratify: Optional[bool] = None) -> Tuple:
        """
        Split data into train and test sets.

        Parameters
        ----------
        X : pandas.DataFrame or numpy.ndarray
            Features.
        y : pandas.Series or numpy.ndarray
            Target variable.
        test_size : float, optional
            Override default test size.
        stratify : bool, optional
            Override default stratify setting.

        Returns
        -------
        tuple
            (X_train, X_test, y_train, y_test)

        Examples
        --------
        >>> X_train, X_test, y_train, y_test = splitter.split(X, y)
        """
        test_size = test_size if test_size is not None else self.test_size
        stratify = stratify if stratify is not None else self.stratify

        y_arr = np.array(y)
        stratify_param = y_arr if stratify and len(np.unique(y_arr)) > 1 else None

        return train_test_split(
            X, y,
            test_size=test_size,
            random_state=self.random_state,
            stratify=stratify_param
        )

    def split_df(self, df: pd.DataFrame, target: str,
                exclude_cols: Optional[List[str]] = None,
                test_size: Optional[float] = None) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Split DataFrame while excluding certain columns from split.

        Parameters
        ----------
        df : pandas.DataFrame
            Input DataFrame.
        target : str
            Target column name.
        exclude_cols : list of str, optional
            Columns to exclude from split.
        test_size : float, optional
            Override default test size.

        Returns
        -------
        tuple
            (train_df, test_df)

        Examples
        --------
        >>> train_df, test_df = splitter.split_df(df, 'target', exclude_cols=['id', 'date'])
        """
        test_size = test_size if test_size is not None else self.test_size

        exclude_cols = exclude_cols or []
        feature_cols = [c for c in df.columns if c not in exclude_cols + [target]]

        X = df[feature_cols]
        y = df[target]

        if self.stratify:
            strat = y
        else:
            strat = None

        X_train, X_test, y_train, y_test = train_test_split(
            X, y,
            test_size=test_size,
            random_state=self.random_state,
            stratify=strat
        )

        train_df = X_train.copy()
        train_df[target] = y_train

        test_df = X_test.copy()
        test_df[target] = y_test

        return train_df, test_df

split

split(X: Union[DataFrame, ndarray], y: Union[Series, ndarray], test_size: Optional[float] = None, stratify: Optional[bool] = None) -> Tuple

Split data into train and test sets.

参数:

名称 类型 描述 默认
X DataFrame or ndarray

Features.

必需
y Series or ndarray

Target variable.

必需
test_size float

Override default test size.

None
stratify bool

Override default stratify setting.

None

返回:

类型 描述
tuple

(X_train, X_test, y_train, y_test)

示例:

>>> X_train, X_test, y_train, y_test = splitter.split(X, y)
源代码位于: Modeling_Tool/Sample/Sample_Split.py
def split(self, X: Union[pd.DataFrame, np.ndarray],
          y: Union[pd.Series, np.ndarray],
          test_size: Optional[float] = None,
          stratify: Optional[bool] = None) -> Tuple:
    """
    Split data into train and test sets.

    Parameters
    ----------
    X : pandas.DataFrame or numpy.ndarray
        Features.
    y : pandas.Series or numpy.ndarray
        Target variable.
    test_size : float, optional
        Override default test size.
    stratify : bool, optional
        Override default stratify setting.

    Returns
    -------
    tuple
        (X_train, X_test, y_train, y_test)

    Examples
    --------
    >>> X_train, X_test, y_train, y_test = splitter.split(X, y)
    """
    test_size = test_size if test_size is not None else self.test_size
    stratify = stratify if stratify is not None else self.stratify

    y_arr = np.array(y)
    stratify_param = y_arr if stratify and len(np.unique(y_arr)) > 1 else None

    return train_test_split(
        X, y,
        test_size=test_size,
        random_state=self.random_state,
        stratify=stratify_param
    )

split_df

split_df(df: DataFrame, target: str, exclude_cols: Optional[List[str]] = None, test_size: Optional[float] = None) -> Tuple[DataFrame, DataFrame]

Split DataFrame while excluding certain columns from split.

参数:

名称 类型 描述 默认
df DataFrame

Input DataFrame.

必需
target str

Target column name.

必需
exclude_cols list of str

Columns to exclude from split.

None
test_size float

Override default test size.

None

返回:

类型 描述
tuple

(train_df, test_df)

示例:

>>> train_df, test_df = splitter.split_df(df, 'target', exclude_cols=['id', 'date'])
源代码位于: Modeling_Tool/Sample/Sample_Split.py
def split_df(self, df: pd.DataFrame, target: str,
            exclude_cols: Optional[List[str]] = None,
            test_size: Optional[float] = None) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Split DataFrame while excluding certain columns from split.

    Parameters
    ----------
    df : pandas.DataFrame
        Input DataFrame.
    target : str
        Target column name.
    exclude_cols : list of str, optional
        Columns to exclude from split.
    test_size : float, optional
        Override default test size.

    Returns
    -------
    tuple
        (train_df, test_df)

    Examples
    --------
    >>> train_df, test_df = splitter.split_df(df, 'target', exclude_cols=['id', 'date'])
    """
    test_size = test_size if test_size is not None else self.test_size

    exclude_cols = exclude_cols or []
    feature_cols = [c for c in df.columns if c not in exclude_cols + [target]]

    X = df[feature_cols]
    y = df[target]

    if self.stratify:
        strat = y
    else:
        strat = None

    X_train, X_test, y_train, y_test = train_test_split(
        X, y,
        test_size=test_size,
        random_state=self.random_state,
        stratify=strat
    )

    train_df = X_train.copy()
    train_df[target] = y_train

    test_df = X_test.copy()
    test_df[target] = y_test

    return train_df, test_df

StratifiedSampler

Stratified sampling with target balance control.

This class provides stratified sampling that maintains the target distribution while allowing controlled sampling.

参数:

名称 类型 描述 默认
target_rate float

Target bad rate in the sample.

None
random_state int

Random seed.

None

方法:

名称 描述
sample

Perform stratified sampling.

balance

Balance sample by adjusting target distribution.

示例:

>>> sampler = StratifiedSampler(target_rate=0.15)
>>> balanced = sampler.balance(df, 'target', method='undersample')
源代码位于: Modeling_Tool/Sample/Sample_Split.py
class StratifiedSampler:
    """
    Stratified sampling with target balance control.

    This class provides stratified sampling that maintains
    the target distribution while allowing controlled sampling.

    Parameters
    ----------
    target_rate : float, optional
        Target bad rate in the sample.
    random_state : int, optional
        Random seed.

    Methods
    -------
    sample(df, target, n_samples=None, sample_frac=None)
        Perform stratified sampling.
    balance(df, target, method='undersample')
        Balance sample by adjusting target distribution.

    Examples
    --------
    >>> sampler = StratifiedSampler(target_rate=0.15)
    >>> balanced = sampler.balance(df, 'target', method='undersample')
    """

    def __init__(self, target_rate: Optional[float] = None,
                 random_state: Optional[int] = None):
        """
        Initialize StratifiedSampler.
        """
        self.target_rate = target_rate
        self.random_state = random_state
        self.original_rate_ = None

    def sample(self, df: pd.DataFrame, target: str,
              n_samples: Optional[int] = None,
              sample_frac: Optional[float] = None) -> pd.DataFrame:
        """
        Perform stratified sampling.

        Parameters
        ----------
        df : pandas.DataFrame
            Input data.
        target : str
            Target column name.
        n_samples : int, optional
            Number of samples to draw.
        sample_frac : float, optional
            Fraction of data to sample.

        Returns
        -------
        pandas.DataFrame
            Sampled DataFrame.

        Examples
        --------
        >>> sampled = sampler.sample(df, 'target', sample_frac=0.5)
        """
        if sample_frac is not None:
            return df.sample(frac=sample_frac, random_state=self.random_state)

        if n_samples is not None:
            return df.sample(n=n_samples, random_state=self.random_state)

        return df.copy()

    def balance(self, df: pd.DataFrame, target: str,
               method: str = 'undersample') -> pd.DataFrame:
        """
        Balance sample by adjusting target distribution.

        Parameters
        ----------
        df : pandas.DataFrame
            Input data.
        target : str
            Target column name.
        method : str, default 'undersample'
            Balancing method: 'undersample', 'oversample', 'smote'.

        Returns
        -------
        pandas.DataFrame
            Balanced DataFrame.

        Examples
        --------
        >>> balanced = sampler.balance(df, 'target', method='undersample')
        """
        self.original_rate_ = df[target].mean()

        if method == 'undersample':
            return self._undersample(df, target)
        elif method == 'oversample':
            return self._oversample(df, target)
        elif method == 'smote':
            return self._smote(df, target)
        else:
            raise ValueError(f"Unknown method: {method}")

    def _undersample(self, df: pd.DataFrame, target: str) -> pd.DataFrame:
        """
        Undersample majority class.
        """
        goods = df[df[target] == 0]
        bads = df[df[target] == 1]

        if self.target_rate is not None:
            n_bads = len(bads)
            n_goods = int(n_bads * (1 - self.target_rate) / self.target_rate)
            goods = goods.sample(n=min(n_goods, len(goods)), random_state=self.random_state)
        else:
            goods = goods.sample(n=len(bads), random_state=self.random_state)

        return pd.concat([goods, bads]).sample(frac=1, random_state=self.random_state)

    def _oversample(self, df: pd.DataFrame, target: str) -> pd.DataFrame:
        """
        Oversample minority class.
        """
        goods = df[df[target] == 0]
        bads = df[df[target] == 1]

        if len(bads) < len(goods):
            if self.target_rate is not None:
                n_goods = len(goods)
                n_bads = int(n_goods * self.target_rate / (1 - self.target_rate))
                bads = bads.sample(n=n_bads, replace=True, random_state=self.random_state)
            else:
                bads = bads.sample(n=len(goods), replace=True, random_state=self.random_state)

        return pd.concat([goods, bads]).sample(frac=1, random_state=self.random_state)

    def _smote(self, df: pd.DataFrame, target: str) -> pd.DataFrame:
        """
        SMOTE oversampling (requires imbalanced-learn).
        """
        try:
            from imblearn.over_sampling import SMOTE
        except ImportError:
            raise ImportError("imbalanced-learn required for SMOTE. Install with: pip install imbalanced-learn")

        goods = df[df[target] == 0]
        bads = df[df[target] == 1]

        if len(bads) < 6:
            return self._oversample(df, target)

        X_cols = [c for c in df.columns if c != target]
        X = df[X_cols].values
        y = df[target].values

        smote = SMOTE(random_state=self.random_state)
        X_resampled, y_resampled = smote.fit_resample(X, y)

        return pd.DataFrame(X_resampled, columns=X_cols).assign(**{target: y_resampled})

sample

sample(df: DataFrame, target: str, n_samples: Optional[int] = None, sample_frac: Optional[float] = None) -> DataFrame

Perform stratified sampling.

参数:

名称 类型 描述 默认
df DataFrame

Input data.

必需
target str

Target column name.

必需
n_samples int

Number of samples to draw.

None
sample_frac float

Fraction of data to sample.

None

返回:

类型 描述
DataFrame

Sampled DataFrame.

示例:

>>> sampled = sampler.sample(df, 'target', sample_frac=0.5)
源代码位于: Modeling_Tool/Sample/Sample_Split.py
def sample(self, df: pd.DataFrame, target: str,
          n_samples: Optional[int] = None,
          sample_frac: Optional[float] = None) -> pd.DataFrame:
    """
    Perform stratified sampling.

    Parameters
    ----------
    df : pandas.DataFrame
        Input data.
    target : str
        Target column name.
    n_samples : int, optional
        Number of samples to draw.
    sample_frac : float, optional
        Fraction of data to sample.

    Returns
    -------
    pandas.DataFrame
        Sampled DataFrame.

    Examples
    --------
    >>> sampled = sampler.sample(df, 'target', sample_frac=0.5)
    """
    if sample_frac is not None:
        return df.sample(frac=sample_frac, random_state=self.random_state)

    if n_samples is not None:
        return df.sample(n=n_samples, random_state=self.random_state)

    return df.copy()

balance

balance(df: DataFrame, target: str, method: str = 'undersample') -> DataFrame

Balance sample by adjusting target distribution.

参数:

名称 类型 描述 默认
df DataFrame

Input data.

必需
target str

Target column name.

必需
method str

Balancing method: 'undersample', 'oversample', 'smote'.

'undersample'

返回:

类型 描述
DataFrame

Balanced DataFrame.

示例:

>>> balanced = sampler.balance(df, 'target', method='undersample')
源代码位于: Modeling_Tool/Sample/Sample_Split.py
def balance(self, df: pd.DataFrame, target: str,
           method: str = 'undersample') -> pd.DataFrame:
    """
    Balance sample by adjusting target distribution.

    Parameters
    ----------
    df : pandas.DataFrame
        Input data.
    target : str
        Target column name.
    method : str, default 'undersample'
        Balancing method: 'undersample', 'oversample', 'smote'.

    Returns
    -------
    pandas.DataFrame
        Balanced DataFrame.

    Examples
    --------
    >>> balanced = sampler.balance(df, 'target', method='undersample')
    """
    self.original_rate_ = df[target].mean()

    if method == 'undersample':
        return self._undersample(df, target)
    elif method == 'oversample':
        return self._oversample(df, target)
    elif method == 'smote':
        return self._smote(df, target)
    else:
        raise ValueError(f"Unknown method: {method}")

SampleBalancer

Advanced sample balancing with multiple methods.

This class provides various sampling techniques to handle class imbalance in credit modeling.

参数:

名称 类型 描述 默认
method str

Balancing method.

'random'
target_ratio float

Desired minority/majority ratio.

None
random_state int

Random seed.

None

方法:

名称 描述
fit_resample

Resample features and target.

get_balanced_indices

Get indices for balanced sampling.

示例:

>>> balancer = SampleBalancer(method='nearmiss')
>>> X_bal, y_bal = balancer.fit_resample(X, y)
源代码位于: Modeling_Tool/Sample/Sample_Split.py
class SampleBalancer:
    """
    Advanced sample balancing with multiple methods.

    This class provides various sampling techniques to handle
    class imbalance in credit modeling.

    Parameters
    ----------
    method : str, default 'random'
        Balancing method.
    target_ratio : float, optional
        Desired minority/majority ratio.
    random_state : int, optional
        Random seed.

    Methods
    -------
    fit_resample(X, y)
        Resample features and target.
    get_balanced_indices(y)
        Get indices for balanced sampling.

    Examples
    --------
    >>> balancer = SampleBalancer(method='nearmiss')
    >>> X_bal, y_bal = balancer.fit_resample(X, y)
    """

    def __init__(self, method: str = 'random',
                 target_ratio: Optional[float] = None,
                 random_state: Optional[int] = None):
        """
        Initialize SampleBalancer.
        """
        self.method = method
        self.target_ratio = target_ratio
        self.random_state = random_state

    def fit_resample(self, X: Union[pd.DataFrame, np.ndarray],
                    y: Union[pd.Series, np.ndarray]) -> Tuple:
        """
        Resample data to balance classes.

        Parameters
        ----------
        X : pandas.DataFrame or numpy.ndarray
            Features.
        y : pandas.Series or numpy.ndarray
            Target.

        Returns
        -------
        tuple
            (X_resampled, y_resampled)

        Examples
        --------
        >>> X_bal, y_bal = balancer.fit_resample(X, y)
        """
        if self.method == 'random':
            return self._random_undersample(X, y)
        elif self.method == 'nearmiss':
            return self._nearmiss(X, y)
        elif self.method == 'tomek':
            return self._tomek_links(X, y)
        elif self.method == 'enn':
            return self._edited_nn(X, y)
        else:
            raise ValueError(f"Unknown method: {self.method}")

    def _random_undersample(self, X, y):
        """
        Random undersampling.
        """
        goods = y == 0
        bads = y == 1

        n_bads = goods.sum() if self.target_ratio else bads.sum()

        if self.target_ratio:
            n_goods = int(n_bads * (1 - self.target_ratio) / self.target_ratio)
        else:
            n_goods = n_bads

        goods_indices = np.where(goods)[0]
        bads_indices = np.where(bads)[0]

        np.random.seed(self.random_state)
        goods_sample = np.random.choice(goods_indices, size=n_goods, replace=False)

        selected = np.concatenate([goods_sample, bads_indices])
        np.random.shuffle(selected)

        if isinstance(X, pd.DataFrame):
            return X.iloc[selected].copy(), y.iloc[selected].copy()
        return X[selected], y[selected]

    def _nearmiss(self, X, y):
        """
        NearMiss undersampling.
        """
        try:
            from imblearn.under_sampling import NearMiss
        except ImportError:
            raise ImportError("imbalanced-learn required. Install with: pip install imbalanced-learn")

        nm = NearMiss(version=1, random_state=self.random_state)
        return nm.fit_resample(X, y)

    def _tomek_links(self, X, y):
        """
        Tomek links cleaning.
        """
        try:
            from imblearn.under_sampling import TomekLinks
        except ImportError:
            raise ImportError("imbalanced-learn required. Install with: pip install imbalanced-learn")

        tl = TomekLinks(random_state=self.random_state)
        return tl.fit_resample(X, y)

    def _edited_nn(self, X, y):
        """
        Edited Nearest Neighbors cleaning.
        """
        try:
            from imblearn.under_sampling import EditedNearestNeighbours
        except ImportError:
            raise ImportError("imbalanced-learn required. Install with: pip install imbalanced-learn")

        enn = EditedNearestNeighbours(random_state=self.random_state)
        return enn.fit_resample(X, y)

fit_resample

fit_resample(X: Union[DataFrame, ndarray], y: Union[Series, ndarray]) -> Tuple

Resample data to balance classes.

参数:

名称 类型 描述 默认
X DataFrame or ndarray

Features.

必需
y Series or ndarray

Target.

必需

返回:

类型 描述
tuple

(X_resampled, y_resampled)

示例:

>>> X_bal, y_bal = balancer.fit_resample(X, y)
源代码位于: Modeling_Tool/Sample/Sample_Split.py
def fit_resample(self, X: Union[pd.DataFrame, np.ndarray],
                y: Union[pd.Series, np.ndarray]) -> Tuple:
    """
    Resample data to balance classes.

    Parameters
    ----------
    X : pandas.DataFrame or numpy.ndarray
        Features.
    y : pandas.Series or numpy.ndarray
        Target.

    Returns
    -------
    tuple
        (X_resampled, y_resampled)

    Examples
    --------
    >>> X_bal, y_bal = balancer.fit_resample(X, y)
    """
    if self.method == 'random':
        return self._random_undersample(X, y)
    elif self.method == 'nearmiss':
        return self._nearmiss(X, y)
    elif self.method == 'tomek':
        return self._tomek_links(X, y)
    elif self.method == 'enn':
        return self._edited_nn(X, y)
    else:
        raise ValueError(f"Unknown method: {self.method}")

select_sample_seed

select_sample_seed(master_df, oot_split_col, model, tgt_name, seed_range=(3000, 3050), ins_prop=0.7)

Select Best Seed for Sample Splitting.

源代码位于: Modeling_Tool/Sample/Sample_Split.py
def select_sample_seed(master_df, oot_split_col, model, tgt_name, seed_range = (3000, 3050), ins_prop = 0.7):
    """ Select Best Seed for Sample Splitting. """

    from tqdm import tqdm

    if isinstance(model, str):
        score = model

    perf_res = pd.DataFrame()
    for seed in tqdm(range(seed_range[0], seed_range[1])):

        train_df = master_df.loc[master_df[oot_split_col].isin([1])]
        oot_df = master_df.loc[master_df[oot_split_col].isin([2])]

        sampler = SampleSplitter(test_size = (1 - ins_prop), random_state=seed, stratify=True)
        mdl_df, val_df = sampler.split_df(train_df, tgt_name)

        mdl_df['sample_ind_fnl'] = "ins"
        val_df['sample_ind_fnl'] = "oos"
        oot_df['sample_ind_fnl'] = "oot"

        drv_w_sample_ind = pd.concat([mdl_df, val_df, oot_df])

        ins_df = drv_w_sample_ind.loc[drv_w_sample_ind['sample_ind_fnl'].isin(['ins'])]
        oos_df = drv_w_sample_ind.loc[drv_w_sample_ind['sample_ind_fnl'].isin(['oos'])]
        oot_df = drv_w_sample_ind.loc[drv_w_sample_ind['sample_ind_fnl'].isin(['oot'])]

        evaluator = PerformanceEvaluator(tgt_name=tgt_name, model=model, feature_cols=get_feature_names(model)) if not isinstance(model, str) else PerformanceEvaluator(tgt_name=tgt_name, scr_name = score)

        if ins_df.shape[0] > 0:
            evaluator = evaluator.add_dataset('train', ins_df)

        if oos_df.shape[0] > 0:
            evaluator = evaluator.add_dataset('validation', oos_df)

        if oot_df.shape[0] > 0:
            evaluator = evaluator.add_dataset('oot', oot_df)

        result = evaluator.evaluate(to_show=False, display = False)
        result['seed'] = seed

        perf_res = pd.concat([perf_res, result], axis = 0)

    return perf_res

拒绝推断 — Reject_Infer

Reject_Infer

Reject inference classes for credit modeling.

This module provides classes for applying reject inference techniques to handle the selection bias in credit modeling when using approved loan data only.

类:

名称 描述
RejectInferrer : Base class for reject inference.
RejectInferenceFactory : Factory for creating reject inference methods.
ParcelingInferrer : Parceling method for reject inference.
FuzzyAugmentInferrer : Fuzzy augmentation method.
HardCutoffInferrer : Hard cutoff method.
SimpleAugmentInferrer : Simple augmentation method.

示例:

>>> from Modeling_Tool_refactored.sample import RejectInferrer
>>> inferrer = RejectInferenceFactory.create('parceling')
>>> df_inferred = inferrer.infer(df_approved, df_rejected, 'score')

RejectInferrer

Bases: ABC

Abstract base class for reject inference methods.

Reject inference is used to address selection bias when building credit models on approved loans only.

参数:

名称 类型 描述 默认
target_col str

Name of the target column.

'target'
score_col str

Name of the score/probability column.

'score'

方法:

名称 描述
infer

Apply reject inference.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
class RejectInferrer(ABC):
    """
    Abstract base class for reject inference methods.

    Reject inference is used to address selection bias when building
    credit models on approved loans only.

    Parameters
    ----------
    target_col : str, default 'target'
        Name of the target column.
    score_col : str, default 'score'
        Name of the score/probability column.

    Methods
    -------
    infer(df_approved, df_rejected, score_col)
        Apply reject inference.
    """

    def __init__(
        self,
        target_col: str = 'target',
        score_col: str = 'score',
        score_direction: str = 'high_good',
        random_state: Optional[int] = None,
    ):
        """
        Initialize RejectInferrer.

        Parameters
        ----------
        target_col : str, default 'target'
            Target column name.
        score_col : str, default 'score'
            Score column name.
        """
        if score_direction not in {"high_good", "high_bad"}:
            raise ValueError("score_direction must be 'high_good' or 'high_bad'")
        self.target_col = target_col
        self.score_col = score_col
        self.score_direction = score_direction
        self.random_state = random_state

    def _bad_probability(self, score: pd.Series) -> pd.Series:
        prob = pd.to_numeric(score, errors="coerce").astype(float)
        if self.score_direction == "high_good":
            prob = 1.0 - prob
        return prob.clip(0.0, 1.0)

    def _rng(self) -> np.random.Generator:
        return np.random.default_rng(self.random_state)

    @abstractmethod
    def infer(self, df_approved: pd.DataFrame,
              df_rejected: pd.DataFrame,
              score_col: Optional[str] = None) -> pd.DataFrame:
        """
        Apply reject inference.

        Parameters
        ----------
        df_approved : pandas.DataFrame
            DataFrame with approved applications (has target).
        df_rejected : pandas.DataFrame
            DataFrame with rejected applications (no target).
        score_col : str, optional
            Score column name.

        Returns
        -------
        pandas.DataFrame
            Combined DataFrame with inferred targets for rejected applications.
        """
        pass

infer abstractmethod

infer(df_approved: DataFrame, df_rejected: DataFrame, score_col: Optional[str] = None) -> DataFrame

Apply reject inference.

参数:

名称 类型 描述 默认
df_approved DataFrame

DataFrame with approved applications (has target).

必需
df_rejected DataFrame

DataFrame with rejected applications (no target).

必需
score_col str

Score column name.

None

返回:

类型 描述
DataFrame

Combined DataFrame with inferred targets for rejected applications.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
@abstractmethod
def infer(self, df_approved: pd.DataFrame,
          df_rejected: pd.DataFrame,
          score_col: Optional[str] = None) -> pd.DataFrame:
    """
    Apply reject inference.

    Parameters
    ----------
    df_approved : pandas.DataFrame
        DataFrame with approved applications (has target).
    df_rejected : pandas.DataFrame
        DataFrame with rejected applications (no target).
    score_col : str, optional
        Score column name.

    Returns
    -------
    pandas.DataFrame
        Combined DataFrame with inferred targets for rejected applications.
    """
    pass

SimpleAugmentInferrer

Bases: RejectInferrer

Simple augmentation reject inference method.

Assigns the average bad rate from approved applications to all rejected applications.

参数:

名称 类型 描述 默认
bad_rate float

Override bad rate to use.

None

示例:

>>> inferrer = SimpleAugmentInferrer()
>>> df_combined = inferrer.infer(df_approved, df_rejected)
源代码位于: Modeling_Tool/Sample/Reject_Infer.py
class SimpleAugmentInferrer(RejectInferrer):
    """
    Simple augmentation reject inference method.

    Assigns the average bad rate from approved applications
    to all rejected applications.

    Parameters
    ----------
    bad_rate : float, optional
        Override bad rate to use.

    Examples
    --------
    >>> inferrer = SimpleAugmentInferrer()
    >>> df_combined = inferrer.infer(df_approved, df_rejected)
    """

    def __init__(
        self,
        target_col: str = 'target',
        score_col: str = 'score',
        bad_rate: Optional[float] = None,
        score_direction: str = 'high_good',
        random_state: Optional[int] = None,
    ):
        """
        Initialize SimpleAugmentInferrer.
        """
        super().__init__(target_col, score_col, score_direction=score_direction, random_state=random_state)
        self.bad_rate = bad_rate

    def infer(self, df_approved: pd.DataFrame,
              df_rejected: pd.DataFrame,
              score_col: Optional[str] = None) -> pd.DataFrame:
        """
        Apply simple augmentation.

        Parameters
        ----------
        df_approved : pandas.DataFrame
            Approved applications.
        df_rejected : pandas.DataFrame
            Rejected applications.
        score_col : str, optional
            Score column.

        Returns
        -------
        pandas.DataFrame
            Combined data with inferred targets.
        """
        score_col = score_col or self.score_col

        if self.bad_rate is None:
            bad_rate = df_approved[self.target_col].mean()
        else:
            bad_rate = self.bad_rate

        inferred_target = self._rng().binomial(1, bad_rate, len(df_rejected))

        df_rejected_copy = df_rejected.copy()
        df_rejected_copy[self.target_col] = inferred_target

        return pd.concat([df_approved, df_rejected_copy], ignore_index=True)

infer

infer(df_approved: DataFrame, df_rejected: DataFrame, score_col: Optional[str] = None) -> DataFrame

Apply simple augmentation.

参数:

名称 类型 描述 默认
df_approved DataFrame

Approved applications.

必需
df_rejected DataFrame

Rejected applications.

必需
score_col str

Score column.

None

返回:

类型 描述
DataFrame

Combined data with inferred targets.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
def infer(self, df_approved: pd.DataFrame,
          df_rejected: pd.DataFrame,
          score_col: Optional[str] = None) -> pd.DataFrame:
    """
    Apply simple augmentation.

    Parameters
    ----------
    df_approved : pandas.DataFrame
        Approved applications.
    df_rejected : pandas.DataFrame
        Rejected applications.
    score_col : str, optional
        Score column.

    Returns
    -------
    pandas.DataFrame
        Combined data with inferred targets.
    """
    score_col = score_col or self.score_col

    if self.bad_rate is None:
        bad_rate = df_approved[self.target_col].mean()
    else:
        bad_rate = self.bad_rate

    inferred_target = self._rng().binomial(1, bad_rate, len(df_rejected))

    df_rejected_copy = df_rejected.copy()
    df_rejected_copy[self.target_col] = inferred_target

    return pd.concat([df_approved, df_rejected_copy], ignore_index=True)

HardCutoffInferrer

Bases: RejectInferrer

Hard cutoff reject inference method.

Assigns all rejected applications below a score threshold as bad (target=1), and all above as good (target=0).

参数:

名称 类型 描述 默认
cutoff float

Score cutoff threshold.

0.5

示例:

>>> inferrer = HardCutoffInferrer(cutoff=0.3)
>>> df_combined = inferrer.infer(df_approved, df_rejected, 'probability')
源代码位于: Modeling_Tool/Sample/Reject_Infer.py
class HardCutoffInferrer(RejectInferrer):
    """
    Hard cutoff reject inference method.

    Assigns all rejected applications below a score threshold
    as bad (target=1), and all above as good (target=0).

    Parameters
    ----------
    cutoff : float, default 0.5
        Score cutoff threshold.

    Examples
    --------
    >>> inferrer = HardCutoffInferrer(cutoff=0.3)
    >>> df_combined = inferrer.infer(df_approved, df_rejected, 'probability')
    """

    def __init__(
        self,
        target_col: str = 'target',
        score_col: str = 'score',
        cutoff: float = 0.5,
        score_direction: str = 'high_good',
        random_state: Optional[int] = None,
    ):
        """
        Initialize HardCutoffInferrer.
        """
        super().__init__(target_col, score_col, score_direction=score_direction, random_state=random_state)
        self.cutoff = cutoff

    def infer(self, df_approved: pd.DataFrame,
              df_rejected: pd.DataFrame,
              score_col: Optional[str] = None) -> pd.DataFrame:
        """
        Apply hard cutoff inference.

        Parameters
        ----------
        df_approved : pandas.DataFrame
            Approved applications.
        df_rejected : pandas.DataFrame
            Rejected applications.
        score_col : str, optional
            Score column.

        Returns
        -------
        pandas.DataFrame
            Combined data with inferred targets.
        """
        score_col = score_col or self.score_col

        df_rejected_copy = df_rejected.copy()
        if self.score_direction == "high_bad":
            df_rejected_copy[self.target_col] = (df_rejected_copy[score_col] >= self.cutoff).astype(int)
        else:
            df_rejected_copy[self.target_col] = (df_rejected_copy[score_col] <= self.cutoff).astype(int)

        return pd.concat([df_approved, df_rejected_copy], ignore_index=True)

infer

infer(df_approved: DataFrame, df_rejected: DataFrame, score_col: Optional[str] = None) -> DataFrame

Apply hard cutoff inference.

参数:

名称 类型 描述 默认
df_approved DataFrame

Approved applications.

必需
df_rejected DataFrame

Rejected applications.

必需
score_col str

Score column.

None

返回:

类型 描述
DataFrame

Combined data with inferred targets.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
def infer(self, df_approved: pd.DataFrame,
          df_rejected: pd.DataFrame,
          score_col: Optional[str] = None) -> pd.DataFrame:
    """
    Apply hard cutoff inference.

    Parameters
    ----------
    df_approved : pandas.DataFrame
        Approved applications.
    df_rejected : pandas.DataFrame
        Rejected applications.
    score_col : str, optional
        Score column.

    Returns
    -------
    pandas.DataFrame
        Combined data with inferred targets.
    """
    score_col = score_col or self.score_col

    df_rejected_copy = df_rejected.copy()
    if self.score_direction == "high_bad":
        df_rejected_copy[self.target_col] = (df_rejected_copy[score_col] >= self.cutoff).astype(int)
    else:
        df_rejected_copy[self.target_col] = (df_rejected_copy[score_col] <= self.cutoff).astype(int)

    return pd.concat([df_approved, df_rejected_copy], ignore_index=True)

FuzzyAugmentInferrer

Bases: RejectInferrer

Fuzzy augmentation reject inference method.

Weights approved applications based on their predicted probability and creates pseudo-target values for rejected applications.

参数:

名称 类型 描述 默认
weight_factor float

Factor to adjust weights.

1.0

示例:

>>> inferrer = FuzzyAugmentInferrer(weight_factor=0.9)
>>> df_combined = inferrer.infer(df_approved, df_rejected, 'probability')
源代码位于: Modeling_Tool/Sample/Reject_Infer.py
class FuzzyAugmentInferrer(RejectInferrer):
    """
    Fuzzy augmentation reject inference method.

    Weights approved applications based on their predicted probability
    and creates pseudo-target values for rejected applications.

    Parameters
    ----------
    weight_factor : float, default 1.0
        Factor to adjust weights.

    Examples
    --------
    >>> inferrer = FuzzyAugmentInferrer(weight_factor=0.9)
    >>> df_combined = inferrer.infer(df_approved, df_rejected, 'probability')
    """

    def __init__(
        self,
        target_col: str = 'target',
        score_col: str = 'score',
        weight_factor: float = 1.0,
        score_direction: str = 'high_good',
        random_state: Optional[int] = None,
    ):
        """
        Initialize FuzzyAugmentInferrer.
        """
        super().__init__(target_col, score_col, score_direction=score_direction, random_state=random_state)
        self.weight_factor = weight_factor

    def infer(self, df_approved: pd.DataFrame,
              df_rejected: pd.DataFrame,
              score_col: Optional[str] = None) -> pd.DataFrame:
        """
        Apply fuzzy augmentation.

        Parameters
        ----------
        df_approved : pandas.DataFrame
            Approved applications.
        df_rejected : pandas.DataFrame
            Rejected applications.
        score_col : str, optional
            Score column.

        Returns
        -------
        pandas.DataFrame
            Combined data with inferred targets.
        """
        score_col = score_col or self.score_col

        df_approved_copy = df_approved.copy()
        df_approved_copy['_weight'] = 1.0

        p_bad = self._bad_probability(df_rejected[score_col]).fillna(0.5)
        bad_copy = df_rejected.copy()
        bad_copy[self.target_col] = 1
        bad_copy['_weight'] = p_bad.to_numpy(dtype=float) * float(self.weight_factor)

        good_copy = df_rejected.copy()
        good_copy[self.target_col] = 0
        good_copy['_weight'] = (1.0 - p_bad.to_numpy(dtype=float)) * float(self.weight_factor)

        return pd.concat([df_approved_copy, bad_copy, good_copy], ignore_index=True)

infer

infer(df_approved: DataFrame, df_rejected: DataFrame, score_col: Optional[str] = None) -> DataFrame

Apply fuzzy augmentation.

参数:

名称 类型 描述 默认
df_approved DataFrame

Approved applications.

必需
df_rejected DataFrame

Rejected applications.

必需
score_col str

Score column.

None

返回:

类型 描述
DataFrame

Combined data with inferred targets.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
def infer(self, df_approved: pd.DataFrame,
          df_rejected: pd.DataFrame,
          score_col: Optional[str] = None) -> pd.DataFrame:
    """
    Apply fuzzy augmentation.

    Parameters
    ----------
    df_approved : pandas.DataFrame
        Approved applications.
    df_rejected : pandas.DataFrame
        Rejected applications.
    score_col : str, optional
        Score column.

    Returns
    -------
    pandas.DataFrame
        Combined data with inferred targets.
    """
    score_col = score_col or self.score_col

    df_approved_copy = df_approved.copy()
    df_approved_copy['_weight'] = 1.0

    p_bad = self._bad_probability(df_rejected[score_col]).fillna(0.5)
    bad_copy = df_rejected.copy()
    bad_copy[self.target_col] = 1
    bad_copy['_weight'] = p_bad.to_numpy(dtype=float) * float(self.weight_factor)

    good_copy = df_rejected.copy()
    good_copy[self.target_col] = 0
    good_copy['_weight'] = (1.0 - p_bad.to_numpy(dtype=float)) * float(self.weight_factor)

    return pd.concat([df_approved_copy, bad_copy, good_copy], ignore_index=True)

ParcelingInferrer

Bases: RejectInferrer

Parceling reject inference method.

Splits rejected applications into parcels based on score bands and assigns average bad rate from approved applications in each parcel.

参数:

名称 类型 描述 默认
n_parcels int

Number of score parcels.

10

示例:

>>> inferrer = ParcelingInferrer(n_parcels=5)
>>> df_combined = inferrer.infer(df_approved, df_rejected, 'score')
源代码位于: Modeling_Tool/Sample/Reject_Infer.py
class ParcelingInferrer(RejectInferrer):
    """
    Parceling reject inference method.

    Splits rejected applications into parcels based on score bands
    and assigns average bad rate from approved applications in
    each parcel.

    Parameters
    ----------
    n_parcels : int, default 10
        Number of score parcels.

    Examples
    --------
    >>> inferrer = ParcelingInferrer(n_parcels=5)
    >>> df_combined = inferrer.infer(df_approved, df_rejected, 'score')
    """

    def __init__(
        self,
        target_col: str = 'target',
        score_col: str = 'score',
        n_parcels: int = 10,
        score_direction: str = 'high_good',
        random_state: Optional[int] = None,
    ):
        """
        Initialize ParcelingInferrer.
        """
        super().__init__(target_col, score_col, score_direction=score_direction, random_state=random_state)
        self.n_parcels = n_parcels
        self.parcel_rates_ = None
        self.parcel_edges_ = None

    def infer(self, df_approved: pd.DataFrame,
              df_rejected: pd.DataFrame,
              score_col: Optional[str] = None) -> pd.DataFrame:
        """
        Apply parceling inference.

        Parameters
        ----------
        df_approved : pandas.DataFrame
            Approved applications.
        df_rejected : pandas.DataFrame
            Rejected applications.
        score_col : str, optional
            Score column.

        Returns
        -------
        pandas.DataFrame
            Combined data with inferred targets.
        """
        score_col = score_col or self.score_col

        df_approved_copy = df_approved.copy()
        df_rejected_copy = df_rejected.copy()

        score_for_bins = self._bad_probability(df_approved_copy[score_col])
        try:
            approved_parcel, edges = pd.qcut(
                score_for_bins,
                q=self.n_parcels,
                labels=False,
                retbins=True,
                duplicates='drop',
            )
        except ValueError:
            approved_parcel = pd.Series(0, index=df_approved_copy.index)
            edges = np.array([score_for_bins.min(), score_for_bins.max()], dtype=float)

        if len(edges) < 2 or not np.isfinite(edges).all() or edges[0] == edges[-1]:
            approved_parcel = pd.Series(0, index=df_approved_copy.index)
            edges = np.array([-np.inf, np.inf], dtype=float)
        else:
            edges = np.asarray(edges, dtype=float)
            edges[0] = -np.inf
            edges[-1] = np.inf

        df_approved_copy['_parcel'] = approved_parcel
        parcel_rates = df_approved_copy.groupby('_parcel')[self.target_col].mean()
        self.parcel_rates_ = parcel_rates
        self.parcel_edges_ = edges

        rejected_score_for_bins = self._bad_probability(df_rejected_copy[score_col])
        df_rejected_copy['_parcel'] = pd.cut(
            rejected_score_for_bins,
            bins=edges,
            labels=False,
            include_lowest=True,
        )

        p_bad = df_rejected_copy['_parcel'].map(parcel_rates).fillna(df_approved_copy[self.target_col].mean())
        df_rejected_copy[self.target_col] = self._rng().binomial(1, p_bad.clip(0.0, 1.0).to_numpy(dtype=float))

        df_approved_copy = df_approved_copy.drop('_parcel', axis=1)
        df_rejected_copy = df_rejected_copy.drop('_parcel', axis=1)

        return pd.concat([df_approved_copy, df_rejected_copy], ignore_index=True)

infer

infer(df_approved: DataFrame, df_rejected: DataFrame, score_col: Optional[str] = None) -> DataFrame

Apply parceling inference.

参数:

名称 类型 描述 默认
df_approved DataFrame

Approved applications.

必需
df_rejected DataFrame

Rejected applications.

必需
score_col str

Score column.

None

返回:

类型 描述
DataFrame

Combined data with inferred targets.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
def infer(self, df_approved: pd.DataFrame,
          df_rejected: pd.DataFrame,
          score_col: Optional[str] = None) -> pd.DataFrame:
    """
    Apply parceling inference.

    Parameters
    ----------
    df_approved : pandas.DataFrame
        Approved applications.
    df_rejected : pandas.DataFrame
        Rejected applications.
    score_col : str, optional
        Score column.

    Returns
    -------
    pandas.DataFrame
        Combined data with inferred targets.
    """
    score_col = score_col or self.score_col

    df_approved_copy = df_approved.copy()
    df_rejected_copy = df_rejected.copy()

    score_for_bins = self._bad_probability(df_approved_copy[score_col])
    try:
        approved_parcel, edges = pd.qcut(
            score_for_bins,
            q=self.n_parcels,
            labels=False,
            retbins=True,
            duplicates='drop',
        )
    except ValueError:
        approved_parcel = pd.Series(0, index=df_approved_copy.index)
        edges = np.array([score_for_bins.min(), score_for_bins.max()], dtype=float)

    if len(edges) < 2 or not np.isfinite(edges).all() or edges[0] == edges[-1]:
        approved_parcel = pd.Series(0, index=df_approved_copy.index)
        edges = np.array([-np.inf, np.inf], dtype=float)
    else:
        edges = np.asarray(edges, dtype=float)
        edges[0] = -np.inf
        edges[-1] = np.inf

    df_approved_copy['_parcel'] = approved_parcel
    parcel_rates = df_approved_copy.groupby('_parcel')[self.target_col].mean()
    self.parcel_rates_ = parcel_rates
    self.parcel_edges_ = edges

    rejected_score_for_bins = self._bad_probability(df_rejected_copy[score_col])
    df_rejected_copy['_parcel'] = pd.cut(
        rejected_score_for_bins,
        bins=edges,
        labels=False,
        include_lowest=True,
    )

    p_bad = df_rejected_copy['_parcel'].map(parcel_rates).fillna(df_approved_copy[self.target_col].mean())
    df_rejected_copy[self.target_col] = self._rng().binomial(1, p_bad.clip(0.0, 1.0).to_numpy(dtype=float))

    df_approved_copy = df_approved_copy.drop('_parcel', axis=1)
    df_rejected_copy = df_rejected_copy.drop('_parcel', axis=1)

    return pd.concat([df_approved_copy, df_rejected_copy], ignore_index=True)

RejectInferenceFactory

Factory class for creating reject inference methods.

示例:

>>> inferrer = RejectInferenceFactory.create('parceling', n_parcels=5)
>>> inferrer = RejectInferenceFactory.create('fuzzy', weight_factor=0.9)
源代码位于: Modeling_Tool/Sample/Reject_Infer.py
class RejectInferenceFactory:
    """
    Factory class for creating reject inference methods.

    Examples
    --------
    >>> inferrer = RejectInferenceFactory.create('parceling', n_parcels=5)
    >>> inferrer = RejectInferenceFactory.create('fuzzy', weight_factor=0.9)
    """

    _methods = {
        'simple': SimpleAugmentInferrer,
        'augment': SimpleAugmentInferrer,
        'hard': HardCutoffInferrer,
        'hardcutoff': HardCutoffInferrer,
        'fuzzy': FuzzyAugmentInferrer,
        'parceling': ParcelingInferrer,
        'parcel': ParcelingInferrer
    }

    @classmethod
    def create(cls, method: str = 'parceling', **kwargs) -> RejectInferrer:
        """
        Create a reject inference method.

        Parameters
        ----------
        method : str, default 'parceling'
            Method name.
        **kwargs
            Additional parameters for the method.

        Returns
        -------
        RejectInferrer
            Instantiated reject inferrer.

        Raises
        ------
        ValueError
            If method name is not recognized.
        """
        method_lower = method.lower()
        if method_lower not in cls._methods:
            raise ValueError(
                f"Unknown method '{method}'. "
                f"Available: {list(set(cls._methods.keys()))}"
            )
        return cls._methods[method_lower](**kwargs)

    @classmethod
    def available_methods(cls) -> List[str]:
        """
        Get list of available methods.

        Returns
        -------
        list of str
            Available method names.
        """
        return list(set(cls._methods.keys()))

create classmethod

create(method: str = 'parceling', **kwargs) -> RejectInferrer

Create a reject inference method.

参数:

名称 类型 描述 默认
method str

Method name.

'parceling'
**kwargs

Additional parameters for the method.

{}

返回:

类型 描述
RejectInferrer

Instantiated reject inferrer.

引发:

类型 描述
ValueError

If method name is not recognized.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
@classmethod
def create(cls, method: str = 'parceling', **kwargs) -> RejectInferrer:
    """
    Create a reject inference method.

    Parameters
    ----------
    method : str, default 'parceling'
        Method name.
    **kwargs
        Additional parameters for the method.

    Returns
    -------
    RejectInferrer
        Instantiated reject inferrer.

    Raises
    ------
    ValueError
        If method name is not recognized.
    """
    method_lower = method.lower()
    if method_lower not in cls._methods:
        raise ValueError(
            f"Unknown method '{method}'. "
            f"Available: {list(set(cls._methods.keys()))}"
        )
    return cls._methods[method_lower](**kwargs)

available_methods classmethod

available_methods() -> List[str]

Get list of available methods.

返回:

类型 描述
list of str

Available method names.

源代码位于: Modeling_Tool/Sample/Reject_Infer.py
@classmethod
def available_methods(cls) -> List[str]:
    """
    Get list of available methods.

    Returns
    -------
    list of str
        Available method names.
    """
    return list(set(cls._methods.keys()))

分布适配 — Distribution_Adaptation

Distribution_Adaptation

DistributionAdaptation

源代码位于: Modeling_Tool/Sample/Distribution_Adaptation.py
class DistributionAdaptation:
    def __init__(self, method='density_ratio'):
        """
        初始化分布适配器

        Parameters:
        -----------
        method: str
            'density_ratio': 密度比估计
            'kl_divergence': KL散度加权
            'covariate_shift': 协变量偏移修正
        """
        self.method = method
        self.sample_weights = None
        self.feature_importances = None

    def estimate_density_ratio(self, X_train, X_oot):
        """
        使用密度比估计方法计算样本权重
        KLIEP/KMM等方法的简化实现
        """
        from sklearn.neighbors import KernelDensity

        # 使用KDE估计密度
        kde_train = KernelDensity(kernel='gaussian', bandwidth=0.5)
        kde_oot = KernelDensity(kernel='gaussian', bandwidth=0.5)

        kde_train.fit(X_train)
        kde_oot.fit(X_oot)

        # 计算密度比: p_oot(x) / p_train(x)
        log_density_train = kde_train.score_samples(X_train)
        log_density_oot = kde_oot.score_samples(X_train)

        # 防止除零和数值不稳定
        density_ratio = np.exp(log_density_oot - log_density_train)

        # 截断处理异常值
        density_ratio = np.clip(density_ratio, 0.1, 10)

        # 归一化
        density_ratio = density_ratio / density_ratio.mean()

        return density_ratio

    def covariate_shift_weighting(self, X_train, X_oot):
        """
        使用领域分类器估计样本重要性权重
        """
        from sklearn.linear_model import LogisticRegression

        # 创建领域标签: 训练集为0, OOT为1
        n_train = len(X_train)
        n_oot = len(X_oot)

        X_combined = np.vstack([X_train, X_oot])
        y_domain = np.hstack([np.zeros(n_train), np.ones(n_oot)])

        # 训练领域分类器
        domain_classifier = LogisticRegression(
            C=1.0, max_iter=1000, random_state=42
        )
        domain_classifier.fit(X_combined, y_domain)

        # 预测训练样本来自OOT的概率
        p_oot = domain_classifier.predict_proba(X_train)[:, 1]

        # 计算权重: p(oot|x) / p(train|x)
        # 防止数值不稳定
        epsilon = 1e-10
        weights = p_oot / (1 - p_oot + epsilon)

        # 使用beta分布平滑权重
        weights = np.clip(weights, 0.1, 10)
        weights = weights / weights.mean()

        return weights

    def fit(self, X_train, X_oot, y_train=None):
        """
        计算适应OOT分布的样本权重
        """
        if self.method == 'density_ratio':
            self.sample_weights = self.estimate_density_ratio(X_train, X_oot)
        elif self.method == 'covariate_shift':
            self.sample_weights = self.covariate_shift_weighting(X_train, X_oot)
        else:
            # 默认使用混合方法
            w1 = self.estimate_density_ratio(X_train, X_oot)
            w2 = self.covariate_shift_weighting(X_train, X_oot)
            self.sample_weights = (w1 + w2) / 2

        return self

    def get_weights(self):
        """返回样本权重"""
        return self.sample_weights

    def visualize_distribution_comparison(self, X_train, X_oot, features=None, n_features=5):
        """
        可视化训练集和OOT集的分布差异
        """
        if features is None:
            # 选择方差最大的特征
            variances = np.var(X_train, axis=0)
            features = np.argsort(variances)[-n_features:]

        fig, axes = plt.subplots(1, n_features, figsize=(5*n_features, 4))

        for idx, feature_idx in enumerate(features[:n_features]):
            ax = axes[idx] if n_features > 1 else axes

            # 绘制分布
            sns.kdeplot(X_train[:, feature_idx], ax=ax, label='Train', fill=True, alpha=0.5)
            sns.kdeplot(X_oot[:, feature_idx], ax=ax, label='OOT', fill=True, alpha=0.5)

            ax.set_title(f'Feature {feature_idx}')
            ax.legend()
            ax.set_xlabel('Value')
            ax.set_ylabel('Density')

        plt.tight_layout()
        plt.show()

estimate_density_ratio

estimate_density_ratio(X_train, X_oot)

使用密度比估计方法计算样本权重 KLIEP/KMM等方法的简化实现

源代码位于: Modeling_Tool/Sample/Distribution_Adaptation.py
def estimate_density_ratio(self, X_train, X_oot):
    """
    使用密度比估计方法计算样本权重
    KLIEP/KMM等方法的简化实现
    """
    from sklearn.neighbors import KernelDensity

    # 使用KDE估计密度
    kde_train = KernelDensity(kernel='gaussian', bandwidth=0.5)
    kde_oot = KernelDensity(kernel='gaussian', bandwidth=0.5)

    kde_train.fit(X_train)
    kde_oot.fit(X_oot)

    # 计算密度比: p_oot(x) / p_train(x)
    log_density_train = kde_train.score_samples(X_train)
    log_density_oot = kde_oot.score_samples(X_train)

    # 防止除零和数值不稳定
    density_ratio = np.exp(log_density_oot - log_density_train)

    # 截断处理异常值
    density_ratio = np.clip(density_ratio, 0.1, 10)

    # 归一化
    density_ratio = density_ratio / density_ratio.mean()

    return density_ratio

covariate_shift_weighting

covariate_shift_weighting(X_train, X_oot)

使用领域分类器估计样本重要性权重

源代码位于: Modeling_Tool/Sample/Distribution_Adaptation.py
def covariate_shift_weighting(self, X_train, X_oot):
    """
    使用领域分类器估计样本重要性权重
    """
    from sklearn.linear_model import LogisticRegression

    # 创建领域标签: 训练集为0, OOT为1
    n_train = len(X_train)
    n_oot = len(X_oot)

    X_combined = np.vstack([X_train, X_oot])
    y_domain = np.hstack([np.zeros(n_train), np.ones(n_oot)])

    # 训练领域分类器
    domain_classifier = LogisticRegression(
        C=1.0, max_iter=1000, random_state=42
    )
    domain_classifier.fit(X_combined, y_domain)

    # 预测训练样本来自OOT的概率
    p_oot = domain_classifier.predict_proba(X_train)[:, 1]

    # 计算权重: p(oot|x) / p(train|x)
    # 防止数值不稳定
    epsilon = 1e-10
    weights = p_oot / (1 - p_oot + epsilon)

    # 使用beta分布平滑权重
    weights = np.clip(weights, 0.1, 10)
    weights = weights / weights.mean()

    return weights

fit

fit(X_train, X_oot, y_train=None)

计算适应OOT分布的样本权重

源代码位于: Modeling_Tool/Sample/Distribution_Adaptation.py
def fit(self, X_train, X_oot, y_train=None):
    """
    计算适应OOT分布的样本权重
    """
    if self.method == 'density_ratio':
        self.sample_weights = self.estimate_density_ratio(X_train, X_oot)
    elif self.method == 'covariate_shift':
        self.sample_weights = self.covariate_shift_weighting(X_train, X_oot)
    else:
        # 默认使用混合方法
        w1 = self.estimate_density_ratio(X_train, X_oot)
        w2 = self.covariate_shift_weighting(X_train, X_oot)
        self.sample_weights = (w1 + w2) / 2

    return self

get_weights

get_weights()

返回样本权重

源代码位于: Modeling_Tool/Sample/Distribution_Adaptation.py
def get_weights(self):
    """返回样本权重"""
    return self.sample_weights

visualize_distribution_comparison

visualize_distribution_comparison(X_train, X_oot, features=None, n_features=5)

可视化训练集和OOT集的分布差异

源代码位于: Modeling_Tool/Sample/Distribution_Adaptation.py
def visualize_distribution_comparison(self, X_train, X_oot, features=None, n_features=5):
    """
    可视化训练集和OOT集的分布差异
    """
    if features is None:
        # 选择方差最大的特征
        variances = np.var(X_train, axis=0)
        features = np.argsort(variances)[-n_features:]

    fig, axes = plt.subplots(1, n_features, figsize=(5*n_features, 4))

    for idx, feature_idx in enumerate(features[:n_features]):
        ax = axes[idx] if n_features > 1 else axes

        # 绘制分布
        sns.kdeplot(X_train[:, feature_idx], ax=ax, label='Train', fill=True, alpha=0.5)
        sns.kdeplot(X_oot[:, feature_idx], ax=ax, label='OOT', fill=True, alpha=0.5)

        ax.set_title(f'Feature {feature_idx}')
        ax.legend()
        ax.set_xlabel('Value')
        ax.set_ylabel('Density')

    plt.tight_layout()
    plt.show()