11 生存分析

核心问题:如何分析”等待时间”?

生存分析(Survival Analysis)研究从某个起始时间到某个事件发生所经历的时间

  • 医学:患者确诊后的存活时间
  • 商业:客户首次购买后的流失时间
  • 工程:设备投产后的故障时间
  • 金融:上市公司从IPO到退市的存续时间

关键挑战:数据中存在删失(Censoring)——在观察结束时,事件尚未发生。

为什么删失样本不能直接丢掉?

如果把所有删失样本都删除,会同时带来两个严重问题:

  • 样本量骤降:大量仍在存续的公司会被浪费掉
  • 幸存信息丢失:‘至少活到今天’ 这部分信息被忽略
  • 估计系统性偏短:只看已退市公司,会错误地觉得企业都’死得很快’
处理方式 会发生什么 为什么有问题
只保留已退市公司 平均存续期被低估 因为长期存活者被整体删掉
把未退市公司当作’永不退市’ 生存概率被高估 因为未来仍可能发生事件
显式建模删失 同时利用完整样本与删失样本 这是生存分析的正确思路

一句话记忆:删失样本不是’没用的数据’,而是’只知道下界的信息’。

生存分析与传统方法的本质区别

传统分类:公司会不会退市?(Yes/No)

生存分析:公司什么时候退市?退市风险如何随时间变化

特征 传统方法 生存分析
目标变量 二元(0/1) 时间 + 事件
信息利用 观测期末状态 全过程信息
处理删失 不能 核心能力
输出 概率 时间相关概率曲线

金融应用场景

  • 信用债违约时间预测
  • IPO后多久达到峰值
  • 基金存续期分析
  • 投资组合回撤恢复时间

生存时间与删失时间

\(T\)真实事件时间(如退市时间),\(C\)删失时间(如研究截止时间)。

我们实际能观测到的是:

\[Y = \min(T, C)\]

以及事件指示变量

\[\delta = \begin{cases} 1, & T \leq C \quad (\text{事件已发生}) \\ 0, & T > C \quad (\text{被删失}) \end{cases}\]

直觉理解:假设你追踪了100家上市公司,5年后有10家退市(\(\delta=1\)),90家仍在交易(\(\delta=0\),被删失)。你不能忽略这90家——它们贡献了”至少存活5年”的信息。

删失的数学处理

观测数据的表示

每个观测 \(i\) 包含:

  • \(T_i\):观测到的时间(事件时间或删失时间)
  • \(\delta_i\):事件指示变量
    • \(\delta_i = 1\):事件发生(未删失)
    • \(\delta_i = 0\)删失(观测截止时事件未发生)

关键假设:独立删失

\[ P(\text{event at } t \mid \text{censored at } t) = P(\text{event at } t) \]

即:删失的发生不能与事件相关

违反示例:如果高风险客户更容易提前还清贷款(逃避监控) → 删失与违约相关 → 分析结果有偏

删失类型:右删失、左删失与区间删失

类型 定义 金融场景示例
右删失 只知道 \(T > C\) 研究截止时公司仍未退市
左删失 只知道 \(T < L\) 只知道公司在数据收集前已违约
区间删失 只知道 \(T \in (L, R]\) 只知道违约发生在两次财报之间
  • 实际分析中,右删失最为常见
  • 独立删失假设:删失机制不能携带关于事件时间的信息

风险集:每个时点到底在和谁比较?

风险集(Risk Set) 指的是在时点 \(t\) 到来之前,仍然有可能发生事件的个体集合。

对公司退市案例来说,时点 \(t\) 的风险集包括:

  • 已经上市
  • \(t\) 时刻之前还没有退市
  • \(t\) 时刻之前也没有因为观察结束而退出样本
公司状态 是否进入时点 \(t\) 的风险集 原因
已在更早时点退市 已经发生事件
在更早时点被删失 后续不再可观测
\(t\) 时刻仍存续 仍然’暴露在风险中’

为什么这个概念重要?

  • Kaplan-Meier 要用它计算条件生存概率
  • 对数秩检验要用它计算期望事件数
  • Cox 模型的偏似然也完全建立在风险集之上

风险集要按时间点一轮一轮更新

风险集不是一次性算完的固定名单,而是会随着时间向前推进不断更新。

最小更新顺序

  1. 先从所有已经进入观察窗口的个体开始
  2. 到某个事件时点时,只保留此前仍未发生事件、也未删失的个体
  3. 用这批“此刻还在场上的人”完成这一时点的比较
  4. 这一时点事件发生后,把已发生事件的个体移出后续风险集
  5. 如果有人在下一事件时点前被删失,他们只参与前面的比较,不再进入后面的风险集

一句话抓手:每到一个新时点,都要先重问一次“现在还剩谁在场上”。

生存函数与Kaplan-Meier估计量

生存函数 \(S(t)\) 定义为事件时间超过 \(t\) 的概率:

\[S(t) = \Pr(T > t)\]

Kaplan-Meier 乘积极限估计量

\[\hat{S}(t) = \prod_{t_j \leq t} \left(1 - \frac{d_j}{n_j}\right)\]

  • \(t_j\):第 \(j\) 个事件发生时间
  • \(d_j\):在 \(t_j\) 时刻发生的事件数
  • \(n_j\):在 \(t_j\) 时刻仍处于”风险集”中的个体数

读 KM 公式的顺序

  • 先找风险集 \(n_j\):此刻还有谁真正’在场’
  • 再看事件数 \(d_j\):这一关到底退掉了多少个体
  • 最后把每一段 \(1-d_j/n_j\) 连乘:整体生存概率就是一路过关的累计结果

Kaplan-Meier 为什么是’条件生存概率的连乘’?

把’活过 10 年’拆开看,其实等于一串条件概率的连乘:

\[ \large{P(T > 10) = P(T > t_1) \times P(T > t_2 \mid T > t_1) \times \cdots} \]

一个简单例子:

  1. 起初有 100 家公司,第 3 年退市 2 家,则这一段的条件生存概率约为 \(98/100\)
  2. 到第 5 年前,又有 5 家公司因为观察结束被删失,所以风险集只剩 93 家
  3. 第 5 年退市 3 家,则这一段的条件生存概率约为 \(90/93\)
  4. 因此活过第 5 年的概率估计为 \((98/100) \times (90/93)\)

核心直觉

  • KM 不是一次性估计’最终能活多久’
  • 而是在每个事件时点上更新一次’这一关能不能过’
  • 把所有’过关概率’乘起来,得到整条生存曲线

Kaplan-Meier 的手工计算顺序其实只有四步

如果让你不用软件、手工更新一次 KM 曲线,可以固定按下面顺序做:

  1. 先把真正发生事件的时间点按先后排好
  2. 到每个事件时点时,先数此刻风险集大小 \(n_j\)
  3. 再数这一时点实际发生事件的人数 \(d_j\)
  4. 用上一时点的生存率乘上 \(\bigl(1-d_j/n_j\bigr)\),得到新的生存率

删失样本的处理顺序

  • 删失不会直接让曲线在当下往下跳
  • 它只会让后续时点的风险集变小
  • 所以删失影响的是后面各步的分母,不是当前这一步的事件数

最容易错的地方:先把删失样本当成事件,或者在错误的时点把它们提前移出风险集。

案例:A股上市公司退市生存分析

我们使用本地 stock_basic_data.h5 数据,分析A股上市公司从上市到退市的生存情况。

import scipy.integrate  # 导入scipy积分模块以添加向后兼容性补丁
scipy.integrate.trapz = scipy.integrate.trapezoid  # 为旧版lifelines添加trapz别名(scipy 1.14+已移除trapz)
import pandas as pd  # 导入pandas用于数据处理
import numpy as np  # 导入numpy用于数值计算
import matplotlib.pyplot as plt  # 导入matplotlib用于可视化
import os  # 导入os模块用于跨平台路径

plt.rcParams['font.family'] = ['Source Han Serif SC', 'Noto Serif CJK SC', 'SimSun']  # 设置中文字体优先级
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

DATA_DIR = 'C:/qiufei/data' if os.name == 'nt' else '/home/ubuntu/r2_data_mount/data'  # 跨平台数据路径

stock_basic_data = pd.read_hdf(os.path.join(DATA_DIR, 'stock/stock_basic_data.h5'))  # 读取上市公司基本信息
stock_basic_data['listed_date'] = pd.to_datetime(stock_basic_data['listed_date'], errors='coerce')  # 转换上市日期格式
stock_basic_data['de_listed_date'] = pd.to_datetime(stock_basic_data['de_listed_date'], errors='coerce')  # 转换退市日期格式
stock_basic_data = stock_basic_data.dropna(subset=['listed_date'])  # 去除缺少上市日期的无效记录

构建生存数据集

observation_end_date = pd.to_datetime('2024-01-01')  # 设定研究观测截止日期
company_survival_records = []  # 初始化列表存储生存记录

for _, company_row in stock_basic_data.iterrows():  # 遍历每家上市公司
    listed_date = company_row['listed_date']  # 获取上市日期
    delisted_date = company_row['de_listed_date']  # 获取退市日期
    if pd.isna(delisted_date) or delisted_date >= observation_end_date:  # 未退市视为删失
        duration_years = (observation_end_date - listed_date).days / 365.25  # 计算存续年数
        event_indicator = 0  # 标记为删失(未退市)
    else:  # 已退市视为事件发生
        duration_years = (delisted_date - listed_date).days / 365.25  # 计算存续年数
        event_indicator = 1  # 标记为事件(已退市)
    if duration_years > 0:  # 排除异常负值记录
        company_survival_records.append({  # 添加生存记录
            'order_book_id': company_row['order_book_id'],  # 股票代码
            'industry': company_row.get('citics_2019_l1_name', 'Unknown'),  # 中信一级行业
            'duration': duration_years,  # 存续时间(年)
            'event': event_indicator  # 事件标记
        })  # 完成记录添加

company_survival_data = pd.DataFrame(company_survival_records)  # 转换为DataFrame
print(f'总样本量: {len(company_survival_data)}')  # 输出总样本量
print(f'退市公司数: {company_survival_data["event"].sum():.0f}')  # 输出退市数量
print(f'退市率: {company_survival_data["event"].mean():.2%}')  # 输出退市比例
总样本量: 5334
退市公司数: 237
退市率: 4.44%

A股整体退市生存曲线

Code
from lifelines import KaplanMeierFitter  # 导入Kaplan-Meier生存曲线估计器

kaplan_meier_fitter = KaplanMeierFitter()  # 实例化KM估计器
kaplan_meier_fitter.fit(  # 拟合整体生存曲线
    company_survival_data['duration'],  # 存续时间
    company_survival_data['event'],  # 事件标记
    label='A股上市公司整体'  # 图例标签
)  # 完成拟合

fig, ax = plt.subplots(figsize=(10, 6))  # 创建画布
kaplan_meier_fitter.plot(ax=ax)  # 绘制KM生存曲线
ax.set_title('A股上市公司退市生存曲线', fontsize=14)  # 设置图标题
ax.set_xlabel('上市年数', fontsize=12)  # 设置x轴标签
ax.set_ylabel('生存概率', fontsize=12)  # 设置y轴标签
ax.grid(True, alpha=0.3)  # 添加网格线
plt.tight_layout()  # 自动调整布局
plt.show()  # 显示图形
Figure 1: A股上市公司整体退市生存曲线

按行业分组的生存曲线

Code
industry_counts = company_survival_data['industry'].value_counts()  # 统计各行业样本量
top_industry_categories = industry_counts.head(5).index.tolist()  # 选取前5大行业

fig, ax = plt.subplots(figsize=(10, 6))  # 创建画布

for ind in top_industry_categories:  # 遍历前5大行业
    mask = company_survival_data['industry'] == ind  # 筛选当前行业
    kaplan_meier_fitter.fit(  # 拟合当前行业KM曲线
        company_survival_data.loc[mask, 'duration'],  # 存续时间
        company_survival_data.loc[mask, 'event'],  # 事件标记
        label=ind  # 行业名称作为图例
    )  # 完成拟合
    kaplan_meier_fitter.plot(ax=ax)  # 绘制到同一画布

ax.set_title('不同行业A股的退市生存曲线比较', fontsize=14)  # 设置图标题
ax.set_xlabel('上市年数', fontsize=12)  # x轴标签
ax.grid(True, alpha=0.3)  # 添加网格线
plt.legend(prop={'family': 'Source Han Serif SC'})  # 设置图例中文字体
plt.tight_layout()  # 自动调整布局
plt.show()  # 显示图形
Figure 2: 不同行业A股退市生存曲线比较

对数秩检验:行业间差异是否显著?

仅靠”目测”曲线分离来下结论是危险的——我们需要对数秩检验(Log-Rank Test)

Code
from lifelines.statistics import logrank_test  # 导入对数秩检验

industry_first = top_industry_categories[0]  # 样本量最大的行业
industry_second = top_industry_categories[1]  # 样本量第二大的行业

mask_first = company_survival_data['industry'] == industry_first  # 第一行业掩码
mask_second = company_survival_data['industry'] == industry_second  # 第二行业掩码

rank_test_results = logrank_test(  # 执行对数秩检验
    company_survival_data.loc[mask_first, 'duration'],  # 行业1存续时间
    company_survival_data.loc[mask_second, 'duration'],  # 行业2存续时间
    company_survival_data.loc[mask_first, 'event'],  # 行业1事件标记
    company_survival_data.loc[mask_second, 'event']  # 行业2事件标记
)  # 完成检验

print(f'比较行业: {industry_first} vs {industry_second}')  # 输出比较的行业名
print(f'对数秩检验统计量: {rank_test_results.test_statistic:.4f}')  # 输出检验统计量
print(f'p值: {rank_test_results.p_value:.4f}')  # 输出p值
比较行业: 机械 vs 基础化工
对数秩检验统计量: 0.9421
p值: 0.3317
Figure 3

对数秩检验在比较什么?

对数秩检验并不是在比较’两条线看起来像不像’,而是在问:

如果两组真的没有差异,那么在每个事件时点,各组本来应该分到多少个事件?

它的比较逻辑是:

  1. 在每个事件时点先看风险集里有多少人
  2. 按各组在风险集中的占比,算出’期望事件数’
  3. 再和’实际事件数’对比
  4. 如果很多时点都朝同一个方向偏离,累计起来就说明两组曲线确实不同
观察现象 统计含义
实际事件数大致等于期望事件数 两组差异可能只是随机波动
某组长期高于期望事件数 该组风险系统性更高
偏离方向时正时负 没有稳定差异证据

对数秩检验的数学原理

研究问题:两组的生存曲线是否有显著差异

\[ H_0: S_1(t) = S_2(t) \quad \forall t \]

检验统计量

\[ W = \frac{\sum_{j=1}^K (d_{1j} - E_{1j})}{\sqrt{\sum_{j=1}^K V_{1j}}} \]

其中:

  • \(d_{1j}\):第1组在 \(t_j\) 时刻的实际事件数
  • \(E_{1j} = n_{1j} \cdot \frac{d_j}{n_j}\):期望事件数
  • 大样本下 \(W \sim N(0,1)\)

金融解读:比较不同行业板块的退市速度差异,或不同信用等级的违约速度差异

Cox模型:量化风险因素的影响

Cox 比例风险模型将风险函数分解为两部分:

\[h(t|x_i) = h_0(t) \exp(x_i^T \beta)\]

  • \(h_0(t)\)基线风险函数(所有协变量为0时的基准风险)
  • \(\exp(x_i^T \beta)\):相对于基准的风险乘数

革命性设计:估计 \(\beta\)完全不需要假设 \(h_0(t)\) 的具体形式!

Cox 在每个事件时点都做一场局部比较

Cox 模型并不是一上来就把全样本硬塞进一个总比较里,而是在每个事件时点重复做同一种局部比较。

这场局部比较的顺序

  1. 固定一个真正发生事件的时点 \(t_i\)
  2. 找出这一时点仍在风险集中的所有个体
  3. 计算每个个体的相对风险分数 \(\exp(x^T\beta)\)
  4. 问“为什么偏偏是这一个个体在此刻发生事件”,也就是把该个体分数除以整组分数和
  5. 对所有事件时点重复这个动作,再把这些条件概率连乘起来

理解抓手:Cox 不是在比较“谁最终活得久”,而是在比较“每一次出事时,谁在当下更像那个会出事的人”。

比例风险假设到底是什么意思?

所谓比例风险(Proportional Hazards),不是说各组风险不变,而是说:

两组之间的风险倍数大致保持稳定。

例如:

  • 低杠杆公司在第 2 年的瞬时退市风险是 1%
  • 高杠杆公司在第 2 年的瞬时退市风险是 1.5%
  • 如果比例风险假设成立,那么到了第 6 年,二者也应大致保持 1.5 倍左右的关系
说法 是否正确
‘风险率是常数’
‘风险随时间可以变化’
‘不同组之间的风险倍数不随时间大幅改变’ 是,这才是比例风险假设

因此:Cox 模型允许时间趋势复杂变化,但要求组间相对差异保持稳定。

Cox模型的核心假设与解读

Cox模型将风险率建模为基线风险与协变量效应的乘积:

\[ h(t|X) = h_0(t) \cdot \exp(\beta_1 X_1 + \cdots + \beta_p X_p) \]

风险比的解读

\(\exp(\beta_j)\) 含义
\(= 1\) 变量 \(X_j\) 对风险无影响
\(> 1\) \(X_j\) 增加1单位,风险增加 \((\exp(\beta_j)-1)\times 100\%\)
\(< 1\) \(X_j\) 增加1单位,风险降低

比例风险假设

\[ \frac{h(t|X_a)}{h(t|X_b)} = \text{常数(不随时间变化)} \]

检验方法:Schoenfeld残差检验

风险比不是’概率比’,也不是’时间比’

读 Cox 结果时,最常见的误读有两个:

  • 把风险比当成’违约概率翻了多少倍’
  • 把风险比当成’存活时间缩短了多少倍’

这两种说法都不严格。

指标 回答的问题
风险比 HR 在同一时点上,谁的瞬时风险更高?
概率差/概率比 到某个截止时间前,谁更可能发生事件?
时间比 两组平均能活多久的比例是多少?

因此:当 \(HR = 1.3\) 时,更准确的说法是:

在其他条件相同的情况下,该变量对应的个体在任一给定时点上的事件发生强度约高出 30%。

偏似然函数:基线风险函数的巧妙消去

补充说明:偏似然函数推导

在事件时刻 \(t_i\),个体 \(j\) 恰好发生事件的条件概率为:

\[\frac{h_0(t_i)\exp(x_j^T\beta)}{\sum_{k \in \mathcal{R}(t_i)} h_0(t_i)\exp(x_k^T\beta)} = \frac{\exp(x_j^T\beta)}{\sum_{k \in \mathcal{R}(t_i)}\exp(x_k^T\beta)}\]

\(h_0(t_i)\) 在分子分母中被完美消去!全样本偏似然函数:

\[L(\beta) = \prod_{i=1}^K \frac{\exp(x_{(i)}^T\beta)}{\sum_{k \in \mathcal{R}(t_i)}\exp(x_k^T\beta)}\]

读偏似然时抓三步

  • 先固定某个事件时点 \(t_i\),只比较这一刻真正还在风险集里的个体
  • 再看分子分母共同含有的 \(h_0(t_i)\),理解它为什么会被整齐约掉
  • 最后再把所有事件时点的条件概率连乘,得到只依赖 \(\beta\) 的偏似然函数

从原始数据到风险比结果的最小流程

真正跑一次 Cox 模型时,可以把全过程压成五步:

  1. 先准备好生存时间、事件指示和协变量
  2. 只在真正发生事件的时点上构造风险集
  3. 在每个事件时点计算“该个体在风险集里被选中的条件概率”
  4. 把所有时点的条件概率合成偏似然,并用数值优化估计 \(eta\)
  5. 最后把系数指数化成风险比 $ ext{exp}()$,再进入解释阶段

结果阅读顺序也应固定

  • 先看比例风险假设是否大致合理
  • 再看风险比方向和大小
  • 最后才看显著性、区间和业务含义

准备Cox模型数据:加入财务风险指标

financial_statement_path = os.path.join(DATA_DIR, 'stock/financial_statement.h5')  # 财务报表路径
financial_statement_raw = pd.read_hdf(financial_statement_path)  # 读取财务报表数据

financial_statement_raw['report_period_end'] = pd.PeriodIndex(  # 将季度标识转换为报告期末日期
    financial_statement_raw['quarter'], freq='Q'  # 按季度频率
).to_timestamp(how='end')  # 取季度最后一天

financial_groups = financial_statement_raw.groupby('order_book_id')  # 按股票代码分组

提取上市初期杠杆率

initial_leverage_records = []  # 存储每家公司的初始杠杆率

for _, company_row in company_survival_data.iterrows():  # 遍历每家公司
    stock_id = company_row['order_book_id']  # 股票代码
    if stock_id not in financial_groups.groups:  # 无财报数据则跳过
        continue  # 跳过当前循环
    company_financials = financial_groups.get_group(stock_id).sort_values('report_period_end')  # 按日期排序
    basic_info = stock_basic_data[stock_basic_data['order_book_id'] == stock_id]  # 查找基本信息
    if len(basic_info) == 0:  # 无基本信息则跳过
        continue  # 跳过当前循环
    listed_date = pd.to_datetime(basic_info.iloc[0]['listed_date'])  # 上市日期
    post_listing = company_financials[company_financials['report_period_end'] > listed_date]  # 上市后的财报
    if len(post_listing) > 0:  # 存在上市后财报
        first_report = post_listing.iloc[0]  # 取第一份
        total_assets = first_report.get('total_assets', None)  # 总资产
        total_liab = first_report.get('total_liabilities', None)  # 总负债
        if pd.notna(total_assets) and total_assets > 0 and pd.notna(total_liab):  # 数据有效
            leverage = total_liab / total_assets  # 计算资产负债率
            initial_leverage_records.append({'order_book_id': stock_id, 'Initial_Leverage': leverage})  # 记录

initial_leverage_df = pd.DataFrame(initial_leverage_records)  # 转换为DataFrame
company_survival_data = company_survival_data.merge(initial_leverage_df, on='order_book_id', how='left')  # 合并杠杆率
median_leverage = company_survival_data['Initial_Leverage'].median()  # 计算中位数
company_survival_data['Initial_Leverage'] = company_survival_data['Initial_Leverage'].fillna(median_leverage)  # 填充缺失值

加入地区分类变量

province_mapping = stock_basic_data[['order_book_id', 'province']].drop_duplicates('order_book_id')  # 提取省份映射
company_survival_data = company_survival_data.merge(province_mapping, on='order_book_id', how='left')  # 合并省份

COASTAL_PROVINCES = ['广东省', '浙江省', '江苏省', '上海市', '北京市', '福建省', '山东省', '天津市']  # 沿海省份列表
company_survival_data['Region'] = company_survival_data['province'].apply(  # 生成地区分类
    lambda prov: 'Coastal' if prov in COASTAL_PROVINCES else 'Inland'  # 按省份判断沿海/内陆
)  # 完成分类

# 创建行业分组(取前5大行业)
company_survival_data['Industry_Group'] = company_survival_data['industry'].where(  # 条件赋值
    company_survival_data['industry'].isin(top_industry_categories), other='Other'  # 非前5行业标记为Other
)  # 完成行业分组

print(f'有效样本量: {len(company_survival_data)}')  # 输出样本量
print(f'沿海公司占比: {(company_survival_data["Region"]=="Coastal").mean():.1%}')  # 输出沿海占比
有效样本量: 5334
沿海公司占比: 68.9%

拟合Cox比例风险模型

from lifelines import CoxPHFitter  # 导入Cox模型拟合器

cox_model_data = company_survival_data[['duration', 'event', 'Industry_Group', 'Region', 'Initial_Leverage']].copy()  # 选取建模变量
cox_model_data['Initial_Leverage'] = cox_model_data['Initial_Leverage'].fillna(  # 填充杠杆率缺失
    cox_model_data['Initial_Leverage'].median() if cox_model_data['Initial_Leverage'].median() == cox_model_data['Initial_Leverage'].median() else 0.5  # 若中位数为NaN则用0.5
)  # 完成填充
cox_model_data['Industry_Group'] = cox_model_data['Industry_Group'].fillna('Other')  # 填充行业缺失
cox_model_data['Region'] = cox_model_data['Region'].fillna('Inland')  # 填充地区缺失
cox_model_data = pd.get_dummies(cox_model_data, drop_first=True, dtype=float)  # 转换为哑变量
cox_model_data = cox_model_data.dropna(subset=['duration', 'event'])  # 删除关键列缺失行

cox_proportional_hazards_model = CoxPHFitter(penalizer=0.1)  # 添加L2正则化防止矩阵奇异
cox_proportional_hazards_model.fit(cox_model_data, duration_col='duration', event_col='event')  # 拟合Cox模型
cox_proportional_hazards_model.print_summary()  # 输出模型完整回归结果
Table 1: Cox比例风险模型回归结果
model lifelines.CoxPHFitter
duration col 'duration'
event col 'event'
penalizer 0.1
l1 ratio 0.0
baseline estimation breslow
number of observations 5334
number of events observed 237
partial log-likelihood -1728.45
time fit was run 2026-06-02 14:21:43 UTC
coef exp(coef) se(coef) coef lower 95% coef upper 95% exp(coef) lower 95% exp(coef) upper 95% cmp to z p -log2(p)
Initial_Leverage 0.07 1.08 0.02 0.02 0.12 1.02 1.13 0.00 2.97 <0.005 8.38
Industry_Group_医药 -0.32 0.73 0.13 -0.58 -0.06 0.56 0.95 0.00 -2.37 0.02 5.83
Industry_Group_基础化工 -0.30 0.74 0.13 -0.56 -0.04 0.57 0.96 0.00 -2.24 0.02 5.33
Industry_Group_机械 -0.20 0.82 0.12 -0.45 0.04 0.64 1.04 0.00 -1.63 0.10 3.29
Industry_Group_电子 -0.19 0.83 0.14 -0.47 0.09 0.62 1.09 0.00 -1.34 0.18 2.48
Industry_Group_计算机 -0.22 0.80 0.16 -0.53 0.10 0.59 1.11 0.00 -1.34 0.18 2.47
Region_Inland 0.15 1.17 0.08 0.00 0.30 1.00 1.35 0.00 2.02 0.04 4.52

Concordance 0.66
Partial AIC 3470.89
log-likelihood ratio test 25.86 on 7 df
-log2(p) of ll-ratio test 10.87

Cox模型结果:森林图

Code
fig, ax = plt.subplots(figsize=(10, 6))  # 创建画布
cox_proportional_hazards_model.plot(ax=ax)  # 绘制系数森林图
ax.set_title('Cox模型: 财务困境风险因子', fontsize=14)  # 设置标题
ax.grid(True, alpha=0.3)  # 添加网格线
plt.tight_layout()  # 自动调整布局
plt.show()  # 显示图形
Figure 4: 财务困境风险因子森林图

读 Cox 森林图时先看中线再看区间最后看量级

森林图最容易把学生看乱的地方,不在于图形复杂,而在于大家常常一上来就找变量名或显著性符号。

更稳妥的阅读顺序是:

  1. 先看参考线:如果图上画的是系数,先看 0 线;如果画的是风险比,先看 1 线
  2. 再看点估计落在线的哪一侧:右侧通常表示风险升高,左侧通常表示风险降低
  3. 再看置信区间是否跨线:跨线说明统计证据不够稳定,不跨线说明方向更可信
  4. 最后再看量级大小:离参考线越远,说明效应相对越强,但仍要回到风险比语言而不是误读成概率变化

课堂口令:先看线,再看边;先看稳不稳,再看大不大。森林图不是让你直接挑“最长的一条”,而是让你按“方向 → 不确定性 → 量级”三个层次读结果。

关键发现解读

风险因子 方向 经济解释
初始杠杆率 风险↑ 高负债企业在下行期更易触发退市条件
医药行业 风险↓ 刚性需求和政策扶持提供保护
内陆地区 风险↑ 融资环境和市场化程度的劣势
  • 杠杆率每增加1个单位,退市风险增加约8%
  • 内陆公司退市风险比沿海高约17%

风险函数的定义

风险函数(Hazard Function)衡量在给定已存活到时间 \(t\) 时的瞬时风险率

\[h(t) = \lim_{\Delta t \to 0} \frac{\Pr(t < T \leq t + \Delta t \mid T > t)}{\Delta t}\]

风险函数与生存函数的关系

\[S(t) = \exp\left(-\int_0^t h(u)\,du\right)\]

函数 回答的问题
生存函数 \(S(t)\) 存活到时间 \(t\) 的概率是多少?
风险函数 \(h(t)\) 如果已活到时间 \(t\),下一瞬间发生事件的概率强度?

不同杠杆率分组的风险函数

Code
from lifelines import NelsonAalenFitter  # 导入Nelson-Aalen累积风险估计器

company_survival_data['leverage_group'] = pd.cut(  # 按杠杆率分组
    company_survival_data['Initial_Leverage'],  # 杠杆率变量
    bins=[0, 0.3, 0.6, float('inf')],  # 分组边界
    labels=['低杠杆(0-30%)', '中杠杆(30-60%)', '高杠杆(>60%)']  # 分组标签
)  # 完成分组

fig, ax = plt.subplots(figsize=(10, 6))  # 创建画布
nelson_aalen_fitter = NelsonAalenFitter()  # 实例化NA估计器

for leverage_group in company_survival_data['leverage_group'].dropna().unique():  # 遍历各分组
    mask = company_survival_data['leverage_group'] == leverage_group  # 筛选当前分组
    if mask.sum() < 2:  # 样本不足则跳过
        continue  # 跳过
    nelson_aalen_fitter.fit(  # 拟合累积风险
        company_survival_data.loc[mask, 'duration'],  # 存续时间
        company_survival_data.loc[mask, 'event']  # 事件标记
    )  # 完成拟合
    cumulative_hazard = nelson_aalen_fitter.cumulative_hazard_  # 获取累积风险
    hazard_estimate = cumulative_hazard.diff().fillna(0)  # 差分得到风险函数估计
    hazard_estimate.plot(ax=ax, label=leverage_group)  # 绘制到画布

ax.set_title('不同初始杠杆率分组的退市风险函数', fontsize=14)  # 设置标题
ax.set_xlabel('生存时间(年)', fontsize=12)  # x轴标签
ax.set_ylabel('风险函数 h(t)', fontsize=12)  # y轴标签
ax.grid(True, alpha=0.3)  # 添加网格线
plt.legend(title='杠杆率分组', prop={'family': 'Source Han Serif SC'})  # 图例
plt.tight_layout()  # 自动调整布局
plt.show()  # 显示图形
Figure 5: 不同初始杠杆率分组的风险函数估计

Lasso正则化:自动变量筛选

当特征数量较多时,使用L1(Lasso)正则化自动筛选重要变量。

验证思路:在真实数据中注入5个纯噪声特征,观察正则化路径中信号与噪声的分离。

from sklearn.preprocessing import StandardScaler  # 导入标准化工具

np.random.seed(1)  # 固定随机种子
for i in range(5):  # 注入5个噪声特征
    cox_model_data[f'Noise_{i}'] = np.random.normal(0, 1, len(cox_model_data))  # 标准正态随机噪声

predictor_matrix = cox_model_data.drop(['duration', 'event'], axis=1)  # 提取协变量矩阵
predictor_matrix = predictor_matrix.select_dtypes(include=[np.number])  # 仅保留数值列

scaler = StandardScaler()  # 实例化标准化器
scaled_predictor_matrix = scaler.fit_transform(predictor_matrix)  # 标准化特征
scaled_predictor_df = pd.DataFrame(scaled_predictor_matrix, columns=predictor_matrix.columns)  # 恢复列名
scaled_predictor_df['duration'] = cox_model_data['duration'].values  # 加回存续时间
scaled_predictor_df['event'] = cox_model_data['event'].values  # 加回事件标记

高维生存分析:Lasso-Cox的数学框架

问题:现代金融数据中,潜在的风险因子可能有几百个

Lasso-Cox模型

\[ \max_\beta \left[ \ell(\beta) - \lambda \sum_{j=1}^p |\beta_j| \right] \]

  • \(\ell(\beta)\):偏似然函数的对数
  • \(\lambda\):正则化强度
  • L1惩罚 → 自动变量选择(部分 \(\beta_j\) 被压缩为0)

选择 \(\lambda\):通过交叉验证选择

  • 注意:生存数据的交叉验证需要特殊处理删失

实操建议:使用 glmnetfamily="cox" 选项

Lasso正则化路径

Code
penalty_parameters = np.logspace(-4, 0, 20)  # 生成20个惩罚参数
coefficient_path = []  # 记录系数变化路径
successful_penalties = []  # 记录成功的惩罚参数

for lambda_val in penalty_parameters:  # 遍历每个惩罚参数
    try:  # 捕获可能的收敛错误
        penalized_model = CoxPHFitter(penalizer=lambda_val, l1_ratio=1.0)  # Lasso正则化
        penalized_model.fit(scaled_predictor_df, duration_col='duration', event_col='event')  # 拟合模型
        coefficient_path.append(penalized_model.params_.values)  # 记录系数
        successful_penalties.append(lambda_val)  # 记录成功参数
    except Exception:  # 收敛失败则跳过
        continue  # 跳过

penalty_parameters = np.array(successful_penalties)  # 转为数组
coefficient_path = np.array(coefficient_path)  # 转为二维数组

fig, ax = plt.subplots(figsize=(12, 7))  # 创建画布
for i, col in enumerate(predictor_matrix.columns):  # 遍历每个特征
    ax.plot(penalty_parameters, coefficient_path[:, i], label=col)  # 绘制系数路径

ax.set_xscale('log')  # x轴对数刻度
ax.set_xlabel('Lambda (正则化参数)', fontsize=12)  # x轴标签
ax.set_ylabel('系数值', fontsize=12)  # y轴标签
ax.set_title('Cox模型的Lasso正则化路径', fontsize=14)  # 标题
ax.grid(True, alpha=0.3)  # 网格线
plt.legend(prop={'family': 'Source Han Serif SC'}, fontsize=8)  # 图例
plt.tight_layout()  # 调整布局
plt.show()  # 显示
Figure 6: Cox模型的Lasso正则化路径

生存树:自动发现非线性关系

生存树不需要线性假设,可以自动发现变量间的交互效应

from sksurv.tree import SurvivalTree  # 导入生存树模型
from sksurv.util import Surv  # 导入生存数据结构化工具
from sklearn.inspection import permutation_importance  # 导入置换重要性

survival_targets = Surv.from_arrays(  # 构造生存目标
    event=cox_model_data['event'].astype(bool),  # 事件标记
    time=cox_model_data['duration']  # 存续时间
)  # 完成构造

tree_predictors = cox_model_data.drop(['duration', 'event'], axis=1)  # 特征矩阵

survival_tree_model = SurvivalTree(max_depth=3, min_samples_split=10, min_samples_leaf=5, random_state=42)  # 限制树深防止过拟合
survival_tree_model.fit(tree_predictors, survival_targets)  # 拟合生存树

perm_result = permutation_importance(  # 计算置换重要性
    survival_tree_model, tree_predictors, survival_targets, n_repeats=10, random_state=42  # 重复10次
)  # 完成计算
feature_importance_df = pd.DataFrame({  # 构建重要性表
    'feature': tree_predictors.columns, 'importance': perm_result.importances_mean  # 特征名和重要性
}).sort_values('importance', ascending=False)  # 按重要性降序排列

生存树特征重要性

Code
fig, ax = plt.subplots(figsize=(10, 6))  # 创建画布
top_features = feature_importance_df.head(10)  # 取前10重要特征
ax.barh(range(len(top_features)), top_features['importance'])  # 水平条形图
ax.set_yticks(range(len(top_features)))  # y轴刻度
ax.set_yticklabels(top_features['feature'])  # y轴标签
ax.set_xlabel('特征重要性', fontsize=12)  # x轴标签
ax.set_title('生存树:特征重要性排名', fontsize=14)  # 标题
ax.grid(True, alpha=0.3, axis='x')  # x方向网格线
plt.tight_layout()  # 调整布局
plt.show()  # 显示
Figure 7: 生存树特征重要性排名

C-index 其实在比较’谁更早出事’的排序能力

C-index 的核心不是看模型能否精确预测’具体会活几年’,而是看它能否把样本的风险先后顺序排对

可以把它理解成一场两两比较:

  1. 任取两家公司
  2. 如果其中一家的事件明显更早发生,那么这对样本就是’可比较的’
  3. 模型若把更早出事的那家公司判为风险更高,则这对比较算’排对了’
C-index 数值 直觉含义
0.5 和抛硬币差不多
0.6 到 0.7 能排出一些先后,但还不稳定
0.7 到 0.8 排序能力较强
大于 0.8 排序非常好

为什么它适合生存分析?

  • 它不要求模型给出精确时间点
  • 它更关心风险排序
  • 这种排序目标更容易与删失数据兼容

C-index:模型评估指标

C-index(一致性指数):评估模型预测”谁先发生事件”的能力。

Table 2: 模型C-index评估
from lifelines.utils import concordance_index  # 导入一致性指数

predicted_risk = cox_proportional_hazards_model.predict_partial_hazard(cox_model_data)  # Cox模型风险预测
cox_c_index = concordance_index(  # 计算C-index
    cox_model_data['duration'], -predicted_risk, cox_model_data['event']  # 风险取负(风险高→生存短)
)  # 完成计算

print(f'Cox比例风险模型 C-index: {cox_c_index:.4f}')  # 输出C-index
Cox比例风险模型 C-index: 0.6565
C-index 范围 含义
0.5 随机猜测(抛硬币)
0.6-0.7 中等预测能力
0.7-0.8 较强预测能力
>0.8 强预测能力

生存分析的金融应用:回撤恢复时间

将生存分析应用于股价回撤恢复时间(Time to Recovery):

  • 事件:股价创新高(解套/恢复)
  • 时间:从上一次创新高到下一次创新高
  • 删失:截至目前仍未创新高(仍在套牢中)

分析海康威视(002415.XSHE)的历史回撤恢复情况。

识别回撤恢复周期

stock_price_history = pd.read_hdf(  # 读取后复权股价数据
    os.path.join(DATA_DIR, 'stock/stock_price_post_adjusted.h5')  # 后复权数据路径
).reset_index()  # 重置MultiIndex

haikang_data = stock_price_history[stock_price_history['order_book_id'] == '002415.XSHE'].copy()  # 筛选海康威视
haikang_data = haikang_data.sort_values('date').set_index('date')  # 按日期排序
closing_prices = haikang_data['close']  # 提取收盘价

running_max = closing_prices.cummax()  # 计算历史累计最高价
is_new_high = (closing_prices >= running_max)  # 标记创新高日期
new_high_dates = closing_prices.index[is_new_high]  # 提取新高日期

recovery_durations = []  # 存储回撤持续天数
recovery_events = []  # 存储事件标记

if len(new_high_dates) > 1:  # 至少需要两个新高
    for i in range(len(new_high_dates) - 1):  # 遍历相邻新高
        duration_days = (new_high_dates[i+1] - new_high_dates[i]).days  # 间隔天数
        if duration_days > 5:  # 仅保留显著回撤
            recovery_durations.append(duration_days)  # 记录天数
            recovery_events.append(1)  # 已恢复
    last_gap = (closing_prices.index[-1] - new_high_dates[-1]).days  # 最后一段
    if last_gap > 5:  # 当前仍在回撤中
        recovery_durations.append(last_gap)  # 记录天数
        recovery_events.append(0)  # 删失(未恢复)

drawdown_data = pd.DataFrame({'duration': recovery_durations, 'event': recovery_events})  # 构建数据框
print(f'识别到 {len(drawdown_data)} 次显著回撤周期')  # 输出总数
识别到 38 次显著回撤周期

回撤恢复时间的KM生存曲线

Code
km_recovery = KaplanMeierFitter()  # 实例化KM估计器
km_recovery.fit(drawdown_data['duration'], drawdown_data['event'], label='回撤恢复')  # 拟合生存曲线

fig, ax = plt.subplots(figsize=(10, 6))  # 创建画布
km_recovery.plot(ax=ax)  # 绘制生存曲线

median_recovery = km_recovery.median_survival_time_  # 获取中位恢复时间
plt.axvline(median_recovery, color='r', linestyle='--', label=f'中位恢复时间: {median_recovery:.0f}天')  # 参考线

ax.set_title('海康威视:股价回撤恢复时间生存曲线', fontsize=14)  # 标题
ax.set_xlabel('天数', fontsize=12)  # x轴标签
ax.set_ylabel('未恢复概率', fontsize=12)  # y轴标签
ax.grid(True, alpha=0.3)  # 网格线
plt.legend(prop={'family': 'Source Han Serif SC'})  # 图例
plt.tight_layout()  # 调整布局
plt.show()  # 显示

prob_30d = km_recovery.predict(30)  # 30天未恢复概率
prob_180d = km_recovery.predict(180)  # 180天未恢复概率
print(f'30天内无法恢复的概率: {prob_30d:.1%}')  # 输出30天概率
print(f'半年(180天)内无法恢复的概率: {prob_180d:.1%}')  # 输出180天概率
Figure 8: 海康威视股价回撤恢复时间的生存分析
30天内无法恢复的概率: 44.7%
半年(180天)内无法恢复的概率: 13.2%

生存分析方法体系总结

方法 类型 核心用途
Kaplan-Meier 非参数 估计生存曲线
对数秩检验 假设检验 比较两组/多组生存曲线
Cox 比例风险 半参数 量化协变量对风险的影响
生存树 非参数 发现非线性关系和交互效应
C-index 评估指标 衡量模型预测排序能力

关键假设

  • 独立删失:删失机制不携带事件信息
  • 比例风险(Cox模型):风险比不随时间变化

默认工作流:先用 KM 看整体曲线,再用对数秩或 Cox 回答比较与解释问题;只有当线性、比例风险或低维假设明显不够时,再转向树模型或 Lasso-Cox。

最小路线图

  • 先画 KM:确认事件大致集中在哪些时间段、删失比例大概有多高。
  • 再做 log-rank 或 Cox:回答’组间是否不同’与’哪些因素在推高风险’。
  • 最后才升级到树模型或 Lasso-Cox:前提是线性关系、比例风险假设或低维设定已经明显不够。

生存分析方法选择指南

根据研究问题选择方法

研究问题 推荐方法 适用条件
估计总体生存曲线 KM估计 无协变量
比较两组生存差异 对数秩检验 分组变量为分类型
分析多因素影响 Cox模型 比例风险假设成立
高维变量选择 Lasso-Cox 特征多,需稀疏解
捕捉非线性效应 生存树/RSF 特征间有交互作用

生存分析在金融监管中的角色

  • 巴塞尔协议(Basel III)中的PD估计(违约概率)
  • 信用评级迁移矩阵
  • 保险精算中的生命表

结果阅读顺序:先确认删失机制与时间起点定义是否合理,再看方法是否匹配研究问题,最后才看 C-index、风险比和显著性结果。

底线提醒:C-index 高说明排序能力强,不等于违约概率或退市概率已经校准得很好;排序和校准是两件不同的事。