核心问题:如何分析”等待时间”?
生存分析(Survival Analysis)研究从某个起始时间到某个事件发生所经历的时间。
- 医学:患者确诊后的存活时间
- 商业:客户首次购买后的流失时间
- 工程:设备投产后的故障时间
- 金融:上市公司从IPO到退市的存续时间
关键挑战:数据中存在删失(Censoring)——在观察结束时,事件尚未发生。
为什么删失样本不能直接丢掉?
如果把所有删失样本都删除,会同时带来两个严重问题:
- 样本量骤降:大量仍在存续的公司会被浪费掉
- 幸存信息丢失:‘至少活到今天’ 这部分信息被忽略
- 估计系统性偏短:只看已退市公司,会错误地觉得企业都’死得很快’
| 只保留已退市公司 |
平均存续期被低估 |
因为长期存活者被整体删掉 |
| 把未退市公司当作’永不退市’ |
生存概率被高估 |
因为未来仍可能发生事件 |
| 显式建模删失 |
同时利用完整样本与删失样本 |
这是生存分析的正确思路 |
一句话记忆:删失样本不是’没用的数据’,而是’只知道下界的信息’。
生存分析与传统方法的本质区别
传统分类:公司会不会退市?(Yes/No)
生存分析:公司什么时候退市?退市风险如何随时间变化?
| 目标变量 |
二元(0/1) |
时间 + 事件 |
| 信息利用 |
观测期末状态 |
全过程信息 |
| 处理删失 |
不能 |
核心能力 |
| 输出 |
概率 |
时间相关概率曲线 |
金融应用场景:
- 信用债违约时间预测
- IPO后多久达到峰值
- 基金存续期分析
- 投资组合回撤恢复时间
生存时间与删失时间
设 \(T\) 为真实事件时间(如退市时间),\(C\) 为删失时间(如研究截止时间)。
我们实际能观测到的是:
\[Y = \min(T, C)\]
以及事件指示变量:
\[\delta = \begin{cases} 1, & T \leq C \quad (\text{事件已发生}) \\ 0, & T > C \quad (\text{被删失}) \end{cases}\]
直觉理解:假设你追踪了100家上市公司,5年后有10家退市(\(\delta=1\)),90家仍在交易(\(\delta=0\),被删失)。你不能忽略这90家——它们贡献了”至少存活5年”的信息。
删失的数学处理
观测数据的表示:
每个观测 \(i\) 包含:
- \(T_i\):观测到的时间(事件时间或删失时间)
- \(\delta_i\):事件指示变量
- \(\delta_i = 1\):事件发生(未删失)
- \(\delta_i = 0\):删失(观测截止时事件未发生)
关键假设:独立删失
\[
P(\text{event at } t \mid \text{censored at } t) = P(\text{event at } t)
\]
即:删失的发生不能与事件相关
违反示例:如果高风险客户更容易提前还清贷款(逃避监控) → 删失与违约相关 → 分析结果有偏
删失类型:右删失、左删失与区间删失
| 右删失 |
只知道 \(T > C\) |
研究截止时公司仍未退市 |
| 左删失 |
只知道 \(T < L\) |
只知道公司在数据收集前已违约 |
| 区间删失 |
只知道 \(T \in (L, R]\) |
只知道违约发生在两次财报之间 |
- 实际分析中,右删失最为常见
- 独立删失假设:删失机制不能携带关于事件时间的信息
风险集:每个时点到底在和谁比较?
风险集(Risk Set) 指的是在时点 \(t\) 到来之前,仍然有可能发生事件的个体集合。
对公司退市案例来说,时点 \(t\) 的风险集包括:
- 已经上市
- 在 \(t\) 时刻之前还没有退市
- 在 \(t\) 时刻之前也没有因为观察结束而退出样本
| 已在更早时点退市 |
否 |
已经发生事件 |
| 在更早时点被删失 |
否 |
后续不再可观测 |
| 到 \(t\) 时刻仍存续 |
是 |
仍然’暴露在风险中’ |
为什么这个概念重要?
- Kaplan-Meier 要用它计算条件生存概率
- 对数秩检验要用它计算期望事件数
- Cox 模型的偏似然也完全建立在风险集之上
生存函数与Kaplan-Meier估计量
生存函数 \(S(t)\) 定义为事件时间超过 \(t\) 的概率:
\[S(t) = \Pr(T > t)\]
Kaplan-Meier 乘积极限估计量:
\[\hat{S}(t) = \prod_{t_j \leq t} \left(1 - \frac{d_j}{n_j}\right)\]
- \(t_j\):第 \(j\) 个事件发生时间
- \(d_j\):在 \(t_j\) 时刻发生的事件数
- \(n_j\):在 \(t_j\) 时刻仍处于”风险集”中的个体数
读 KM 公式的顺序:
- 先找风险集 \(n_j\):此刻还有谁真正’在场’
- 再看事件数 \(d_j\):这一关到底退掉了多少个体
- 最后把每一段 \(1-d_j/n_j\) 连乘:整体生存概率就是一路过关的累计结果
案例:A股上市公司退市生存分析
我们使用本地 stock_basic_data.h5 数据,分析A股上市公司从上市到退市的生存情况。
import scipy.integrate # 导入scipy积分模块以添加向后兼容性补丁
scipy.integrate.trapz = scipy.integrate.trapezoid # 为旧版lifelines添加trapz别名(scipy 1.14+已移除trapz)
import pandas as pd # 导入pandas用于数据处理
import numpy as np # 导入numpy用于数值计算
import matplotlib.pyplot as plt # 导入matplotlib用于可视化
import os # 导入os模块用于跨平台路径
plt.rcParams['font.family'] = ['Source Han Serif SC', 'Noto Serif CJK SC', 'SimSun'] # 设置中文字体优先级
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
DATA_DIR = 'C:/qiufei/data' if os.name == 'nt' else '/home/ubuntu/r2_data_mount/data' # 跨平台数据路径
stock_basic_data = pd.read_hdf(os.path.join(DATA_DIR, 'stock/stock_basic_data.h5')) # 读取上市公司基本信息
stock_basic_data['listed_date'] = pd.to_datetime(stock_basic_data['listed_date'], errors='coerce') # 转换上市日期格式
stock_basic_data['de_listed_date'] = pd.to_datetime(stock_basic_data['de_listed_date'], errors='coerce') # 转换退市日期格式
stock_basic_data = stock_basic_data.dropna(subset=['listed_date']) # 去除缺少上市日期的无效记录
构建生存数据集
observation_end_date = pd.to_datetime('2024-01-01') # 设定研究观测截止日期
company_survival_records = [] # 初始化列表存储生存记录
for _, company_row in stock_basic_data.iterrows(): # 遍历每家上市公司
listed_date = company_row['listed_date'] # 获取上市日期
delisted_date = company_row['de_listed_date'] # 获取退市日期
if pd.isna(delisted_date) or delisted_date >= observation_end_date: # 未退市视为删失
duration_years = (observation_end_date - listed_date).days / 365.25 # 计算存续年数
event_indicator = 0 # 标记为删失(未退市)
else: # 已退市视为事件发生
duration_years = (delisted_date - listed_date).days / 365.25 # 计算存续年数
event_indicator = 1 # 标记为事件(已退市)
if duration_years > 0: # 排除异常负值记录
company_survival_records.append({ # 添加生存记录
'order_book_id': company_row['order_book_id'], # 股票代码
'industry': company_row.get('citics_2019_l1_name', 'Unknown'), # 中信一级行业
'duration': duration_years, # 存续时间(年)
'event': event_indicator # 事件标记
}) # 完成记录添加
company_survival_data = pd.DataFrame(company_survival_records) # 转换为DataFrame
print(f'总样本量: {len(company_survival_data)}') # 输出总样本量
print(f'退市公司数: {company_survival_data["event"].sum():.0f}') # 输出退市数量
print(f'退市率: {company_survival_data["event"].mean():.2%}') # 输出退市比例
总样本量: 5334
退市公司数: 237
退市率: 4.44%
A股整体退市生存曲线
Code
from lifelines import KaplanMeierFitter # 导入Kaplan-Meier生存曲线估计器
kaplan_meier_fitter = KaplanMeierFitter() # 实例化KM估计器
kaplan_meier_fitter.fit( # 拟合整体生存曲线
company_survival_data['duration'], # 存续时间
company_survival_data['event'], # 事件标记
label='A股上市公司整体' # 图例标签
) # 完成拟合
fig, ax = plt.subplots(figsize=(10, 6)) # 创建画布
kaplan_meier_fitter.plot(ax=ax) # 绘制KM生存曲线
ax.set_title('A股上市公司退市生存曲线', fontsize=14) # 设置图标题
ax.set_xlabel('上市年数', fontsize=12) # 设置x轴标签
ax.set_ylabel('生存概率', fontsize=12) # 设置y轴标签
ax.grid(True, alpha=0.3) # 添加网格线
plt.tight_layout() # 自动调整布局
plt.show() # 显示图形
按行业分组的生存曲线
Code
industry_counts = company_survival_data['industry'].value_counts() # 统计各行业样本量
top_industry_categories = industry_counts.head(5).index.tolist() # 选取前5大行业
fig, ax = plt.subplots(figsize=(10, 6)) # 创建画布
for ind in top_industry_categories: # 遍历前5大行业
mask = company_survival_data['industry'] == ind # 筛选当前行业
kaplan_meier_fitter.fit( # 拟合当前行业KM曲线
company_survival_data.loc[mask, 'duration'], # 存续时间
company_survival_data.loc[mask, 'event'], # 事件标记
label=ind # 行业名称作为图例
) # 完成拟合
kaplan_meier_fitter.plot(ax=ax) # 绘制到同一画布
ax.set_title('不同行业A股的退市生存曲线比较', fontsize=14) # 设置图标题
ax.set_xlabel('上市年数', fontsize=12) # x轴标签
ax.grid(True, alpha=0.3) # 添加网格线
plt.legend(prop={'family': 'Source Han Serif SC'}) # 设置图例中文字体
plt.tight_layout() # 自动调整布局
plt.show() # 显示图形
对数秩检验:行业间差异是否显著?
仅靠”目测”曲线分离来下结论是危险的——我们需要对数秩检验(Log-Rank Test)。
Code
from lifelines.statistics import logrank_test # 导入对数秩检验
industry_first = top_industry_categories[0] # 样本量最大的行业
industry_second = top_industry_categories[1] # 样本量第二大的行业
mask_first = company_survival_data['industry'] == industry_first # 第一行业掩码
mask_second = company_survival_data['industry'] == industry_second # 第二行业掩码
rank_test_results = logrank_test( # 执行对数秩检验
company_survival_data.loc[mask_first, 'duration'], # 行业1存续时间
company_survival_data.loc[mask_second, 'duration'], # 行业2存续时间
company_survival_data.loc[mask_first, 'event'], # 行业1事件标记
company_survival_data.loc[mask_second, 'event'] # 行业2事件标记
) # 完成检验
print(f'比较行业: {industry_first} vs {industry_second}') # 输出比较的行业名
print(f'对数秩检验统计量: {rank_test_results.test_statistic:.4f}') # 输出检验统计量
print(f'p值: {rank_test_results.p_value:.4f}') # 输出p值
对数秩检验的数学原理
研究问题:两组的生存曲线是否有显著差异?
\[
H_0: S_1(t) = S_2(t) \quad \forall t
\]
检验统计量:
\[
W = \frac{\sum_{j=1}^K (d_{1j} - E_{1j})}{\sqrt{\sum_{j=1}^K V_{1j}}}
\]
其中:
- \(d_{1j}\):第1组在 \(t_j\) 时刻的实际事件数
- \(E_{1j} = n_{1j} \cdot \frac{d_j}{n_j}\):期望事件数
- 大样本下 \(W \sim N(0,1)\)
金融解读:比较不同行业板块的退市速度差异,或不同信用等级的违约速度差异
Cox模型:量化风险因素的影响
Cox 比例风险模型将风险函数分解为两部分:
\[h(t|x_i) = h_0(t) \exp(x_i^T \beta)\]
- \(h_0(t)\):基线风险函数(所有协变量为0时的基准风险)
- \(\exp(x_i^T \beta)\):相对于基准的风险乘数
革命性设计:估计 \(\beta\) 时完全不需要假设 \(h_0(t)\) 的具体形式!
比例风险假设到底是什么意思?
所谓比例风险(Proportional Hazards),不是说各组风险不变,而是说:
两组之间的风险倍数大致保持稳定。
例如:
- 低杠杆公司在第 2 年的瞬时退市风险是 1%
- 高杠杆公司在第 2 年的瞬时退市风险是 1.5%
- 如果比例风险假设成立,那么到了第 6 年,二者也应大致保持 1.5 倍左右的关系
| ‘风险率是常数’ |
否 |
| ‘风险随时间可以变化’ |
是 |
| ‘不同组之间的风险倍数不随时间大幅改变’ |
是,这才是比例风险假设 |
因此:Cox 模型允许时间趋势复杂变化,但要求组间相对差异保持稳定。
Cox模型的核心假设与解读
Cox模型将风险率建模为基线风险与协变量效应的乘积:
\[
h(t|X) = h_0(t) \cdot \exp(\beta_1 X_1 + \cdots + \beta_p X_p)
\]
风险比的解读:
| \(= 1\) |
变量 \(X_j\) 对风险无影响 |
| \(> 1\) |
\(X_j\) 增加1单位,风险增加 \((\exp(\beta_j)-1)\times 100\%\) |
| \(< 1\) |
\(X_j\) 增加1单位,风险降低 |
比例风险假设:
\[
\frac{h(t|X_a)}{h(t|X_b)} = \text{常数(不随时间变化)}
\]
检验方法:Schoenfeld残差检验
风险比不是’概率比’,也不是’时间比’
读 Cox 结果时,最常见的误读有两个:
- 把风险比当成’违约概率翻了多少倍’
- 把风险比当成’存活时间缩短了多少倍’
这两种说法都不严格。
| 风险比 HR |
在同一时点上,谁的瞬时风险更高? |
| 概率差/概率比 |
到某个截止时间前,谁更可能发生事件? |
| 时间比 |
两组平均能活多久的比例是多少? |
因此:当 \(HR = 1.3\) 时,更准确的说法是:
在其他条件相同的情况下,该变量对应的个体在任一给定时点上的事件发生强度约高出 30%。
偏似然函数:基线风险函数的巧妙消去
补充说明:偏似然函数推导
在事件时刻 \(t_i\),个体 \(j\) 恰好发生事件的条件概率为:
\[\frac{h_0(t_i)\exp(x_j^T\beta)}{\sum_{k \in \mathcal{R}(t_i)} h_0(t_i)\exp(x_k^T\beta)} = \frac{\exp(x_j^T\beta)}{\sum_{k \in \mathcal{R}(t_i)}\exp(x_k^T\beta)}\]
\(h_0(t_i)\) 在分子分母中被完美消去!全样本偏似然函数:
\[L(\beta) = \prod_{i=1}^K \frac{\exp(x_{(i)}^T\beta)}{\sum_{k \in \mathcal{R}(t_i)}\exp(x_k^T\beta)}\]
读偏似然时抓三步:
- 先固定某个事件时点 \(t_i\),只比较这一刻真正还在风险集里的个体
- 再看分子分母共同含有的 \(h_0(t_i)\),理解它为什么会被整齐约掉
- 最后再把所有事件时点的条件概率连乘,得到只依赖 \(\beta\) 的偏似然函数
准备Cox模型数据:加入财务风险指标
financial_statement_path = os.path.join(DATA_DIR, 'stock/financial_statement.h5') # 财务报表路径
financial_statement_raw = pd.read_hdf(financial_statement_path) # 读取财务报表数据
financial_statement_raw['report_period_end'] = pd.PeriodIndex( # 将季度标识转换为报告期末日期
financial_statement_raw['quarter'], freq='Q' # 按季度频率
).to_timestamp(how='end') # 取季度最后一天
financial_groups = financial_statement_raw.groupby('order_book_id') # 按股票代码分组
提取上市初期杠杆率
initial_leverage_records = [] # 存储每家公司的初始杠杆率
for _, company_row in company_survival_data.iterrows(): # 遍历每家公司
stock_id = company_row['order_book_id'] # 股票代码
if stock_id not in financial_groups.groups: # 无财报数据则跳过
continue # 跳过当前循环
company_financials = financial_groups.get_group(stock_id).sort_values('report_period_end') # 按日期排序
basic_info = stock_basic_data[stock_basic_data['order_book_id'] == stock_id] # 查找基本信息
if len(basic_info) == 0: # 无基本信息则跳过
continue # 跳过当前循环
listed_date = pd.to_datetime(basic_info.iloc[0]['listed_date']) # 上市日期
post_listing = company_financials[company_financials['report_period_end'] > listed_date] # 上市后的财报
if len(post_listing) > 0: # 存在上市后财报
first_report = post_listing.iloc[0] # 取第一份
total_assets = first_report.get('total_assets', None) # 总资产
total_liab = first_report.get('total_liabilities', None) # 总负债
if pd.notna(total_assets) and total_assets > 0 and pd.notna(total_liab): # 数据有效
leverage = total_liab / total_assets # 计算资产负债率
initial_leverage_records.append({'order_book_id': stock_id, 'Initial_Leverage': leverage}) # 记录
initial_leverage_df = pd.DataFrame(initial_leverage_records) # 转换为DataFrame
company_survival_data = company_survival_data.merge(initial_leverage_df, on='order_book_id', how='left') # 合并杠杆率
median_leverage = company_survival_data['Initial_Leverage'].median() # 计算中位数
company_survival_data['Initial_Leverage'] = company_survival_data['Initial_Leverage'].fillna(median_leverage) # 填充缺失值
加入地区分类变量
province_mapping = stock_basic_data[['order_book_id', 'province']].drop_duplicates('order_book_id') # 提取省份映射
company_survival_data = company_survival_data.merge(province_mapping, on='order_book_id', how='left') # 合并省份
COASTAL_PROVINCES = ['广东省', '浙江省', '江苏省', '上海市', '北京市', '福建省', '山东省', '天津市'] # 沿海省份列表
company_survival_data['Region'] = company_survival_data['province'].apply( # 生成地区分类
lambda prov: 'Coastal' if prov in COASTAL_PROVINCES else 'Inland' # 按省份判断沿海/内陆
) # 完成分类
# 创建行业分组(取前5大行业)
company_survival_data['Industry_Group'] = company_survival_data['industry'].where( # 条件赋值
company_survival_data['industry'].isin(top_industry_categories), other='Other' # 非前5行业标记为Other
) # 完成行业分组
print(f'有效样本量: {len(company_survival_data)}') # 输出样本量
print(f'沿海公司占比: {(company_survival_data["Region"]=="Coastal").mean():.1%}') # 输出沿海占比
有效样本量: 5334
沿海公司占比: 68.9%
拟合Cox比例风险模型
from lifelines import CoxPHFitter # 导入Cox模型拟合器
cox_model_data = company_survival_data[['duration', 'event', 'Industry_Group', 'Region', 'Initial_Leverage']].copy() # 选取建模变量
cox_model_data['Initial_Leverage'] = cox_model_data['Initial_Leverage'].fillna( # 填充杠杆率缺失
cox_model_data['Initial_Leverage'].median() if cox_model_data['Initial_Leverage'].median() == cox_model_data['Initial_Leverage'].median() else 0.5 # 若中位数为NaN则用0.5
) # 完成填充
cox_model_data['Industry_Group'] = cox_model_data['Industry_Group'].fillna('Other') # 填充行业缺失
cox_model_data['Region'] = cox_model_data['Region'].fillna('Inland') # 填充地区缺失
cox_model_data = pd.get_dummies(cox_model_data, drop_first=True, dtype=float) # 转换为哑变量
cox_model_data = cox_model_data.dropna(subset=['duration', 'event']) # 删除关键列缺失行
cox_proportional_hazards_model = CoxPHFitter(penalizer=0.1) # 添加L2正则化防止矩阵奇异
cox_proportional_hazards_model.fit(cox_model_data, duration_col='duration', event_col='event') # 拟合Cox模型
cox_proportional_hazards_model.print_summary() # 输出模型完整回归结果
Cox模型结果:森林图
Code
fig, ax = plt.subplots(figsize=(10, 6)) # 创建画布
cox_proportional_hazards_model.plot(ax=ax) # 绘制系数森林图
ax.set_title('Cox模型: 财务困境风险因子', fontsize=14) # 设置标题
ax.grid(True, alpha=0.3) # 添加网格线
plt.tight_layout() # 自动调整布局
plt.show() # 显示图形
关键发现解读
| 初始杠杆率 |
风险↑ |
高负债企业在下行期更易触发退市条件 |
| 医药行业 |
风险↓ |
刚性需求和政策扶持提供保护 |
| 内陆地区 |
风险↑ |
融资环境和市场化程度的劣势 |
- 杠杆率每增加1个单位,退市风险增加约8%
- 内陆公司退市风险比沿海高约17%
风险函数的定义
风险函数(Hazard Function)衡量在给定已存活到时间 \(t\) 时的瞬时风险率:
\[h(t) = \lim_{\Delta t \to 0} \frac{\Pr(t < T \leq t + \Delta t \mid T > t)}{\Delta t}\]
风险函数与生存函数的关系:
\[S(t) = \exp\left(-\int_0^t h(u)\,du\right)\]
| 生存函数 \(S(t)\) |
存活到时间 \(t\) 的概率是多少? |
| 风险函数 \(h(t)\) |
如果已活到时间 \(t\),下一瞬间发生事件的概率强度? |
不同杠杆率分组的风险函数
Code
from lifelines import NelsonAalenFitter # 导入Nelson-Aalen累积风险估计器
company_survival_data['leverage_group'] = pd.cut( # 按杠杆率分组
company_survival_data['Initial_Leverage'], # 杠杆率变量
bins=[0, 0.3, 0.6, float('inf')], # 分组边界
labels=['低杠杆(0-30%)', '中杠杆(30-60%)', '高杠杆(>60%)'] # 分组标签
) # 完成分组
fig, ax = plt.subplots(figsize=(10, 6)) # 创建画布
nelson_aalen_fitter = NelsonAalenFitter() # 实例化NA估计器
for leverage_group in company_survival_data['leverage_group'].dropna().unique(): # 遍历各分组
mask = company_survival_data['leverage_group'] == leverage_group # 筛选当前分组
if mask.sum() < 2: # 样本不足则跳过
continue # 跳过
nelson_aalen_fitter.fit( # 拟合累积风险
company_survival_data.loc[mask, 'duration'], # 存续时间
company_survival_data.loc[mask, 'event'] # 事件标记
) # 完成拟合
cumulative_hazard = nelson_aalen_fitter.cumulative_hazard_ # 获取累积风险
hazard_estimate = cumulative_hazard.diff().fillna(0) # 差分得到风险函数估计
hazard_estimate.plot(ax=ax, label=leverage_group) # 绘制到画布
ax.set_title('不同初始杠杆率分组的退市风险函数', fontsize=14) # 设置标题
ax.set_xlabel('生存时间(年)', fontsize=12) # x轴标签
ax.set_ylabel('风险函数 h(t)', fontsize=12) # y轴标签
ax.grid(True, alpha=0.3) # 添加网格线
plt.legend(title='杠杆率分组', prop={'family': 'Source Han Serif SC'}) # 图例
plt.tight_layout() # 自动调整布局
plt.show() # 显示图形
Lasso正则化:自动变量筛选
当特征数量较多时,使用L1(Lasso)正则化自动筛选重要变量。
验证思路:在真实数据中注入5个纯噪声特征,观察正则化路径中信号与噪声的分离。
from sklearn.preprocessing import StandardScaler # 导入标准化工具
np.random.seed(1) # 固定随机种子
for i in range(5): # 注入5个噪声特征
cox_model_data[f'Noise_{i}'] = np.random.normal(0, 1, len(cox_model_data)) # 标准正态随机噪声
predictor_matrix = cox_model_data.drop(['duration', 'event'], axis=1) # 提取协变量矩阵
predictor_matrix = predictor_matrix.select_dtypes(include=[np.number]) # 仅保留数值列
scaler = StandardScaler() # 实例化标准化器
scaled_predictor_matrix = scaler.fit_transform(predictor_matrix) # 标准化特征
scaled_predictor_df = pd.DataFrame(scaled_predictor_matrix, columns=predictor_matrix.columns) # 恢复列名
scaled_predictor_df['duration'] = cox_model_data['duration'].values # 加回存续时间
scaled_predictor_df['event'] = cox_model_data['event'].values # 加回事件标记
高维生存分析:Lasso-Cox的数学框架
问题:现代金融数据中,潜在的风险因子可能有几百个
Lasso-Cox模型:
\[
\max_\beta \left[ \ell(\beta) - \lambda \sum_{j=1}^p |\beta_j| \right]
\]
- \(\ell(\beta)\):偏似然函数的对数
- \(\lambda\):正则化强度
- L1惩罚 → 自动变量选择(部分 \(\beta_j\) 被压缩为0)
选择 \(\lambda\):通过交叉验证选择
实操建议:使用 glmnet 的 family="cox" 选项
Lasso正则化路径
Code
penalty_parameters = np.logspace(-4, 0, 20) # 生成20个惩罚参数
coefficient_path = [] # 记录系数变化路径
successful_penalties = [] # 记录成功的惩罚参数
for lambda_val in penalty_parameters: # 遍历每个惩罚参数
try: # 捕获可能的收敛错误
penalized_model = CoxPHFitter(penalizer=lambda_val, l1_ratio=1.0) # Lasso正则化
penalized_model.fit(scaled_predictor_df, duration_col='duration', event_col='event') # 拟合模型
coefficient_path.append(penalized_model.params_.values) # 记录系数
successful_penalties.append(lambda_val) # 记录成功参数
except Exception: # 收敛失败则跳过
continue # 跳过
penalty_parameters = np.array(successful_penalties) # 转为数组
coefficient_path = np.array(coefficient_path) # 转为二维数组
fig, ax = plt.subplots(figsize=(12, 7)) # 创建画布
for i, col in enumerate(predictor_matrix.columns): # 遍历每个特征
ax.plot(penalty_parameters, coefficient_path[:, i], label=col) # 绘制系数路径
ax.set_xscale('log') # x轴对数刻度
ax.set_xlabel('Lambda (正则化参数)', fontsize=12) # x轴标签
ax.set_ylabel('系数值', fontsize=12) # y轴标签
ax.set_title('Cox模型的Lasso正则化路径', fontsize=14) # 标题
ax.grid(True, alpha=0.3) # 网格线
plt.legend(prop={'family': 'Source Han Serif SC'}, fontsize=8) # 图例
plt.tight_layout() # 调整布局
plt.show() # 显示
生存树:自动发现非线性关系
生存树不需要线性假设,可以自动发现变量间的交互效应。
from sksurv.tree import SurvivalTree # 导入生存树模型
from sksurv.util import Surv # 导入生存数据结构化工具
from sklearn.inspection import permutation_importance # 导入置换重要性
survival_targets = Surv.from_arrays( # 构造生存目标
event=cox_model_data['event'].astype(bool), # 事件标记
time=cox_model_data['duration'] # 存续时间
) # 完成构造
tree_predictors = cox_model_data.drop(['duration', 'event'], axis=1) # 特征矩阵
survival_tree_model = SurvivalTree(max_depth=3, min_samples_split=10, min_samples_leaf=5, random_state=42) # 限制树深防止过拟合
survival_tree_model.fit(tree_predictors, survival_targets) # 拟合生存树
perm_result = permutation_importance( # 计算置换重要性
survival_tree_model, tree_predictors, survival_targets, n_repeats=10, random_state=42 # 重复10次
) # 完成计算
feature_importance_df = pd.DataFrame({ # 构建重要性表
'feature': tree_predictors.columns, 'importance': perm_result.importances_mean # 特征名和重要性
}).sort_values('importance', ascending=False) # 按重要性降序排列
生存树特征重要性
Code
fig, ax = plt.subplots(figsize=(10, 6)) # 创建画布
top_features = feature_importance_df.head(10) # 取前10重要特征
ax.barh(range(len(top_features)), top_features['importance']) # 水平条形图
ax.set_yticks(range(len(top_features))) # y轴刻度
ax.set_yticklabels(top_features['feature']) # y轴标签
ax.set_xlabel('特征重要性', fontsize=12) # x轴标签
ax.set_title('生存树:特征重要性排名', fontsize=14) # 标题
ax.grid(True, alpha=0.3, axis='x') # x方向网格线
plt.tight_layout() # 调整布局
plt.show() # 显示
C-index:模型评估指标
C-index(一致性指数):评估模型预测”谁先发生事件”的能力。
| 0.5 |
随机猜测(抛硬币) |
| 0.6-0.7 |
中等预测能力 |
| 0.7-0.8 |
较强预测能力 |
| >0.8 |
强预测能力 |
生存分析的金融应用:回撤恢复时间
将生存分析应用于股价回撤恢复时间(Time to Recovery):
- 事件:股价创新高(解套/恢复)
- 时间:从上一次创新高到下一次创新高
- 删失:截至目前仍未创新高(仍在套牢中)
分析海康威视(002415.XSHE)的历史回撤恢复情况。
识别回撤恢复周期
stock_price_history = pd.read_hdf( # 读取后复权股价数据
os.path.join(DATA_DIR, 'stock/stock_price_post_adjusted.h5') # 后复权数据路径
).reset_index() # 重置MultiIndex
haikang_data = stock_price_history[stock_price_history['order_book_id'] == '002415.XSHE'].copy() # 筛选海康威视
haikang_data = haikang_data.sort_values('date').set_index('date') # 按日期排序
closing_prices = haikang_data['close'] # 提取收盘价
running_max = closing_prices.cummax() # 计算历史累计最高价
is_new_high = (closing_prices >= running_max) # 标记创新高日期
new_high_dates = closing_prices.index[is_new_high] # 提取新高日期
recovery_durations = [] # 存储回撤持续天数
recovery_events = [] # 存储事件标记
if len(new_high_dates) > 1: # 至少需要两个新高
for i in range(len(new_high_dates) - 1): # 遍历相邻新高
duration_days = (new_high_dates[i+1] - new_high_dates[i]).days # 间隔天数
if duration_days > 5: # 仅保留显著回撤
recovery_durations.append(duration_days) # 记录天数
recovery_events.append(1) # 已恢复
last_gap = (closing_prices.index[-1] - new_high_dates[-1]).days # 最后一段
if last_gap > 5: # 当前仍在回撤中
recovery_durations.append(last_gap) # 记录天数
recovery_events.append(0) # 删失(未恢复)
drawdown_data = pd.DataFrame({'duration': recovery_durations, 'event': recovery_events}) # 构建数据框
print(f'识别到 {len(drawdown_data)} 次显著回撤周期') # 输出总数
回撤恢复时间的KM生存曲线
Code
km_recovery = KaplanMeierFitter() # 实例化KM估计器
km_recovery.fit(drawdown_data['duration'], drawdown_data['event'], label='回撤恢复') # 拟合生存曲线
fig, ax = plt.subplots(figsize=(10, 6)) # 创建画布
km_recovery.plot(ax=ax) # 绘制生存曲线
median_recovery = km_recovery.median_survival_time_ # 获取中位恢复时间
plt.axvline(median_recovery, color='r', linestyle='--', label=f'中位恢复时间: {median_recovery:.0f}天') # 参考线
ax.set_title('海康威视:股价回撤恢复时间生存曲线', fontsize=14) # 标题
ax.set_xlabel('天数', fontsize=12) # x轴标签
ax.set_ylabel('未恢复概率', fontsize=12) # y轴标签
ax.grid(True, alpha=0.3) # 网格线
plt.legend(prop={'family': 'Source Han Serif SC'}) # 图例
plt.tight_layout() # 调整布局
plt.show() # 显示
prob_30d = km_recovery.predict(30) # 30天未恢复概率
prob_180d = km_recovery.predict(180) # 180天未恢复概率
print(f'30天内无法恢复的概率: {prob_30d:.1%}') # 输出30天概率
print(f'半年(180天)内无法恢复的概率: {prob_180d:.1%}') # 输出180天概率
30天内无法恢复的概率: 44.7%
半年(180天)内无法恢复的概率: 13.2%
生存分析方法体系总结
| Kaplan-Meier |
非参数 |
估计生存曲线 |
| 对数秩检验 |
假设检验 |
比较两组/多组生存曲线 |
| Cox 比例风险 |
半参数 |
量化协变量对风险的影响 |
| 生存树 |
非参数 |
发现非线性关系和交互效应 |
| C-index |
评估指标 |
衡量模型预测排序能力 |
关键假设:
- 独立删失:删失机制不携带事件信息
- 比例风险(Cox模型):风险比不随时间变化
默认工作流:先用 KM 看整体曲线,再用对数秩或 Cox 回答比较与解释问题;只有当线性、比例风险或低维假设明显不够时,再转向树模型或 Lasso-Cox。
最小路线图:
- 先画 KM:确认事件大致集中在哪些时间段、删失比例大概有多高。
- 再做 log-rank 或 Cox:回答’组间是否不同’与’哪些因素在推高风险’。
- 最后才升级到树模型或 Lasso-Cox:前提是线性关系、比例风险假设或低维设定已经明显不够。
生存分析方法选择指南
根据研究问题选择方法:
| 估计总体生存曲线 |
KM估计 |
无协变量 |
| 比较两组生存差异 |
对数秩检验 |
分组变量为分类型 |
| 分析多因素影响 |
Cox模型 |
比例风险假设成立 |
| 高维变量选择 |
Lasso-Cox |
特征多,需稀疏解 |
| 捕捉非线性效应 |
生存树/RSF |
特征间有交互作用 |
生存分析在金融监管中的角色:
- 巴塞尔协议(Basel III)中的PD估计(违约概率)
- 信用评级迁移矩阵
- 保险精算中的生命表
结果阅读顺序:先确认删失机制与时间起点定义是否合理,再看方法是否匹配研究问题,最后才看 C-index、风险比和显著性结果。
底线提醒:C-index 高说明排序能力强,不等于违约概率或退市概率已经校准得很好;排序和校准是两件不同的事。