import pandas as pd
import numpy as np
from datetime import datetime
print('=== 开始数据整合项目 ===\n')
# ============================================================================
# 1. 加载数据源A:日度行情数据
# ============================================================================
print('[1/6] 加载数据源A:日度行情数据')
# 从本地 Parquet 文件读取数据代替 HDF5
port_stock_data = pd.read_parquet(
'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_stock_data = pd.read_parquet(
'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
maotai_stock_data = pd.read_parquet(
'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_stock_data['symbol'] = '601018.SH'
bank_stock_data['symbol'] = '002142.SZ'
maotai_stock_data['symbol'] = '600519.SH'
stock_data = pd.concat([port_stock_data, bank_stock_data, maotai_stock_data])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
price_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-01-31')]
# 筛选三只股票
symbols_map = {
'601018.SH': '宁波港',
'002142.SZ': '宁波银行',
'600519.SH': '贵州茅台'
}
price_data = price_data[price_data['symbol'].isin(symbols_map.keys())].copy()
price_data['symbol'] = price_data['symbol'].map(symbols_map)
# 提取关键字段
price_data = price_data[['datetime', 'symbol', 'open', 'high', 'low', 'close', 'volume']].copy()
price_data = price_data.rename(columns={'symbol': 'stock_name'})
print(f' 形状: {price_data.shape}')
print(f' 日期范围: {price_data["datetime"].min()} 至 {price_data["datetime"].max()}')
print(f' 股票数量: {price_data["stock_name"].nunique()}')
# ============================================================================
# 2. 创建数据源B:财务指标数据
# ============================================================================
print('\n[2/6] 创建数据源B:财务指标数据')
# 财务数据按月更新
financial_dates = pd.date_range(start='2023-01-01', end='2023-01-31', freq='7D')
financial_data = []
for stock in symbols_map.values():
for date in financial_dates:
financial_data.append({
'stock_name': stock,
'date': date,
'pe_ratio': np.random.uniform(8, 50),
'pb_ratio': np.random.uniform(0.5, 10),
'roe': np.random.uniform(5, 25),
'debt_ratio': np.random.uniform(20, 70)
})
financial_df = pd.DataFrame(financial_data)
print(f' 形状: {financial_df.shape}')
print(f' 列: {financial_df.columns.tolist()}')
# ============================================================================
# 3. 创建数据源C:行业分类数据
# ============================================================================
print('\n[3/6] 创建数据源C:行业分类数据')
industry_data = pd.DataFrame({
'stock_name': ['宁波港', '宁波银行', '贵州茅台'],
'industry': ['港口航运', '银行', '白酒'],
'sector': ['交通运输', '金融服务', '消费品'],
'region': ['华东-宁波', '华东-宁波', '西南-贵阳']
})
print(f' 形状: {industry_data.shape}')
print(industry_data)
# ============================================================================
# 4. 创建数据源D:分析师评级数据
# ============================================================================
print('\n[4/6] 创建数据源D:分析师评级数据')
# 评级数据随机更新
rating_dates = pd.date_range(start='2023-01-01', end='2023-01-31', freq='5D')
rating_data = []
ratings = ['买入', '增持', '中性', '减持', '卖出']
for stock in symbols_map.values():
for date in rating_dates:
rating_data.append({
'stock_name': stock,
'rating_date': date,
'rating': np.random.choice(ratings, p=[0.4, 0.3, 0.2, 0.05, 0.05]),
'target_price': np.random.uniform(80, 120)
})
rating_df = pd.DataFrame(rating_data)
print(f' 形状: {rating_df.shape}')
print(f' 评级分布:')
print(rating_df['rating'].value_counts())
# ============================================================================
# 5. 数据整合
# ============================================================================
print('\n[5/6] 整合所有数据源')
# 步骤1: 整合价格数据和行业信息
merged = pd.merge(price_data, industry_data, on='stock_name', how='left')
# 步骤2: 整合财务数据(基于日期匹配)
# 由于财务数据频率较低,使用前向填充
merged['date_only'] = pd.to_datetime(merged['datetime']).dt.normalize()
financial_df['date_only'] = pd.to_datetime(financial_df['date']).dt.normalize()
merged = pd.merge(
merged,
financial_df[['stock_name', 'date_only', 'pe_ratio', 'pb_ratio', 'roe', 'debt_ratio']],
on=['stock_name', 'date_only'],
how='left'
)
# 步骤3: 整合评级数据
# 确保合并键类型一致(统一为 datetime64[ns])
merged['date_only'] = pd.to_datetime(merged['date_only'])
rating_df['rating_date'] = pd.to_datetime(rating_df['rating_date']).dt.normalize()
merged = pd.merge(
merged,
rating_df[['stock_name', 'rating_date', 'rating', 'target_price']],
left_on=['stock_name', 'date_only'],
right_on=['stock_name', 'rating_date'],
how='left'
)
# 步骤4: 填充缺失值
# 财务指标前向填充
merged[['pe_ratio', 'pb_ratio', 'roe', 'debt_ratio']] = merged.groupby('stock_name')[
['pe_ratio', 'pb_ratio', 'roe', 'debt_ratio']
].ffill()
# 评级后向填充(今天的评级对未来几天有效)
merged[['rating', 'target_price']] = merged.groupby('stock_name')[
['rating', 'target_price']
].bfill()
# 清理辅助列
merged = merged.drop(columns=['date_only', 'rating_date'])
print(f' 整合后形状: {merged.shape}')
print(f' 缺失值: {merged.isna().sum().sum()}')
# ============================================================================
# 6. 创建整合报告
# ============================================================================
print('\n[6/6] 数据整合报告')
print('=' * 60)
print('\n数据完整性:')
completeness = (1 - merged.isna().mean()) * 100
for col in merged.columns:
print(f' {col}: {completeness[col]:.2f}%')
print('\n数据范围:')
print(f' 总记录数: {len(merged)}')
print(f' 日期范围: {merged["datetime"].min()} 至 {merged["datetime"].max()}')
print(f' 股票数量: {merged["stock_name"].nunique()}')
print('\n按股票统计:')
stock_stats = merged.groupby('stock_name').agg({
'close': ['mean', 'min', 'max'],
'volume': 'sum'
}).round(2)
print(stock_stats)
print('\n按行业统计:')
industry_stats = merged.groupby('industry').agg({
'close': 'mean',
'volume': 'sum'
}).round(2)
print(industry_stats)
print('\n评级分布:')
rating_dist = merged['rating'].value_counts()
print(rating_dist)
# 显示最终数据样本
print('\n最终整合数据样本(前10行):')
display_cols = ['datetime', 'stock_name', 'industry', 'close', 'volume',
'pe_ratio', 'pb_ratio', 'rating', 'target_price']
print(merged[display_cols].head(10).to_string())
# 导出选项
print('\n=== 导出建议 ===')
print('整合后的数据可以:')
print('1. 导出为CSV: merged.to_csv("integrated_data.csv")')
print('2. 导出为Excel: merged.to_excel("integrated_data.xlsx")')
print('3. 导出为Parquet: merged.to_parquet("integrated_data.parquet")')
print('4. 导出为HDF5: merged.to_hdf("integrated_data.h5", key="data")')