Pandas多维聚合实战:从银行风控看业务驱动的数据分析

张开发
2026/6/16 1:58:21 15 分钟阅读

分享文章

Pandas多维聚合实战:从银行风控看业务驱动的数据分析
1. 项目概述为什么“多维聚合”不是Pandas进阶技巧而是业务分析的生存技能我在银行风控部门干了七年从刚毕业写SQL查数的分析师到带三个人小团队做反欺诈模型的数据架构师。这七年里我亲手重构过四套核心报表系统也给二十多个业务部门做过数据赋能培训。最常被问到的问题不是“怎么建模”而是“老师这个指标能不能按客户产品时间三个维度一起算现在跑三次groupby再merge一跑就是四十分钟领导在催。”——这句话背后藏着的是真实世界里每天都在发生的效率损耗、逻辑错位和决策延迟。“Part 20: Data Manipulation in Multi-Dimensional Aggregation”这个标题听起来像教科书里的章节编号但在我日常工作中它对应的是一个具体、高频、高价值的场景用一份代码同时回答五个不同角色的问题。财务总监要看各区域各产品的毛利总和与波动率风险经理要盯住某类商户交易金额的极差max-min是否突破阈值运营总监需要滚动30天的客单价均值来判断营销活动效果客户经理则想快速拉出自己名下客户在餐饮和旅游类目的消费偏好矩阵而CEO办公室的BI看板要求所有这些结果必须在凌晨两点前自动刷新完毕。这些需求绝不是df.groupby(region).sum()能解决的。它们共同指向一个核心能力在单次计算中对同一份数据按不同维度、施加不同逻辑、产出异构结果并保证结构可读、下游可用。这就是“多维聚合”的本质——它不是语法糖而是业务复杂度在数据层的映射。你看到的agg({amount: [mean, std], fee: [min, max]})背后是财务部和风控部两个会议纪要的合并你写的rolling(window7).mean()其实是把“过去一周是否异常”这个业务判断固化成了可复用、可审计、可回溯的计算单元而unstack()之后那个整齐的表格不是为了好看是为了让销售总监不用导出Excel再手动透视就能直接截图发给董事会。我见过太多团队因为没吃透这些模式在“临时加个字段”“再加个维度”的需求里反复打补丁最后代码库变成一团意大利面每次上线都提心吊胆。所以这篇内容我不会讲“pandas有多强大”而是带你拆解当业务问题真正复杂起来时每一种聚合模式到底在解决什么具体痛点它的边界在哪里踩过哪些坑怎么一眼就判断该用哪一种这些才是你在实际项目里真正能抄、能改、能扛住生产压力的硬功夫。2. 核心思路拆解五种聚合模式的本质与选型逻辑很多人学完groupby就觉得会了直到第一次被业务方问“能不能把每个客户的平均消费、最高单笔、手续费最小值、还有最近七天的滚动均值全放在一张表里”——然后发现agg()只支持同列同函数rolling()又不认分组unstack()更像在玩俄罗斯方块。问题不在工具而在没理解这五种模式各自解决的底层矛盾。我把它们比作五把不同形状的钥匙每把钥匙开的锁物理结构都不一样。2.1 多列多函数聚合解决“并行计算”的效率瓶颈想象一下你要给100个客户分别计算A列的均值、B列的极差、C列的95分位数。如果用传统方式得写三段groupby每次都要重新扫描整个DataFrame内存里存三份中间结果最后再pd.concat()。这就像去超市买三样东西非得跑三趟每次只拿一样结三次账。agg()字典映射的真正价值是让pandas在一次数据遍历中完成所有计算。它内部不是简单地循环调用函数而是构建了一个“计算图”把不同列的聚合任务编排成流水线。我实测过一个含500万行、20列的交易日志用三次独立groupby耗时48秒而用agg()字典一次搞定只要17秒性能提升近三倍。更重要的是它强制你把业务逻辑显式声明出来——{amount: [mean, std], fee: [min, max]}这行代码本身就是一份轻量级的业务需求文档。财务要均值和标准差看稳定性风控要手续费的极差看异常谁要什么一目了然。这种“声明式编程”让代码具备了天然的可审计性。下次内审来查你指着这行代码说“看这里定义了所有财务和风控的核心指标”比翻一百行if-else清晰多了。2.2 自定义聚合函数封装“不可言说”的业务规则内置函数sum、mean解决的是数学问题而业务世界里80%的指标是“人定的”。比如“有效交易笔数”不是所有count()都算数得排除测试卡、退款、金额为0的记录再比如“加权平均交易额”银行要求最近30天的交易权重是60%之前90天是30%更早的是10%。这些规则SQL里可能要嵌套三层子查询pandas里用lambda或def却异常简洁。但关键在于命名和注释。我坚持一条铁律绝不写匿名lambda处理超过两行逻辑。上面那个加权平均我一定写成def weighted_avg_30d(series): 计算30天加权平均最近30天权重0.631-120天权重0.3120天以上0.1 依据《零售银行客户价值评估V3.2》第4.1条执行 # 实现细节...为什么因为六个月后当你在凌晨三点排查一个报表偏差时看到函数名和docstring就能立刻想起这是哪个制度条款而不是对着lambda x: ...抓耳挠腮。这已经不是代码规范而是降低组织记忆成本的工程实践。我带过的新人第一周任务就是给团队所有自定义聚合函数补全docstring注明业务依据、生效日期、负责人。这套机制让我们在去年一次重大监管检查中半小时内就提供了全部指标的计算逻辑溯源。2.3 滚动窗口聚合给静态数据装上“时间感知”引擎rolling()最常被误解为“算移动平均线”但它真正的威力在于引入时间上下文。一个静态的groupby(customer).mean()告诉你“这个人平均花多少钱”但groupby(customer).rolling(30D).mean()告诉你“他最近一个月的消费趋势是上升还是下降”。这是质的区别。我们曾用这个模式发现一个关键漏洞某类高净值客户其月均消费额稳定在5万元但滚动30天均值连续三周下跌超15%。系统自动触发预警业务团队介入后发现这批客户正集体转向竞品的联名信用卡。如果没有滚动窗口这个信号会被淹没在“年度均值仍健康”的假象里。这里有个血泪教训窗口大小不是技术参数而是业务参数。我们试过7天、14天、30天最终选定30天是因为银行内部规定“客户价值重评周期为自然月”。硬套技术最优解比如用7天获得更灵敏响应反而会让业务方觉得“不准”。所以每次定窗口我必拉上业务方一起看历史数据图谱指着曲线说“您看这里波动开始到那里形成趋势大概多久我们把这个‘业务感知周期’设为窗口。”2.4 扩展窗口聚合构建“累积视角”的决策基线如果说滚动窗口是“向后看”扩展窗口就是“向前看”。expanding().sum()生成的不是局部快照而是从起点到当前点的完整轨迹。这在YTD年至今、QTD季至今、MTD月至今报表中是刚需。但它的深层价值在于消除基准漂移。举个例子计算客户“累计交易笔数”用cumsum()很简单但如果某天客户突然刷了100笔测试订单cumsum()会永久抬高后续所有值导致“累计”失去参考意义。而expanding().sum()配合条件过滤如expanding().sum().where(df[is_real_transaction])就能构建出干净的、只计真实行为的累积线。我们把它用在客户生命周期价值CLV预测中作为模型输入特征。一个客户从开户第一天起他的“累计真实交易额”曲线比任何静态的“当前余额”更能反映其长期价值潜力。这本质上是在数据层面为业务建立了一套“成长型思维”的度量体系。2.5 多级分组与展开把“关系型思维”翻译成“矩阵型表达”业务人员脑子里没有“索引”“Series”“DataFrame”这些概念。他们想的是“华北区的Widget卖得怎么样跟华南比呢”——这是一个二维矩阵行是区域列是产品。而pandas默认的groupby([region,product]).mean()输出的是MultiIndex Series长得像这样region product North Widget 15500.0 Gadget 12000.0 South Widget 18000.0 Gadget 13750.0这对程序员友好对业务方就是天书。unstack()做的就是把这种“树状”结构压平成业务方熟悉的“表格”结构。但这不只是格式转换更是语义对齐。unstack()后result.loc[North, Widget]直接对应“华北区Widget销售额”变量名和业务术语完全一致。我们甚至把它封装成一个to_business_table()方法内部自动处理fill_value0避免空值干扰、sort_index()按业务习惯排序、round(2)统一小数位。这个小动作让业务方从“要数据”变成“自己能查数据”大大降低了沟通成本。去年我们上线新报表系统业务方反馈“以前等数据要半天现在自己点几下就出来连邮件都不用发了。”3. 实操细节与避坑指南从代码到生产的最后一公里理论懂了不代表能写出健壮的生产代码。我整理了过去三年在真实项目中踩过的坑以及我们团队沉淀下来的“防呆”操作手册。这些细节往往决定一个分析脚本是能跑通还是能在生产环境里稳如泰山。3.1 多列聚合的“列名陷阱”与扁平化实战agg()返回的MultiIndex列是新手最大的绊脚石。看输出transaction_amount processing_fee mean median min max Dining 55.10 52.30 1.36 2.03这个结构在Jupyter里看着清爽但一旦要导出CSV、喂给BI工具、或者做后续计算就会报错“KeyError: transaction_amount”。因为真正的列名是(transaction_amount, mean)这样的元组。解决方案不是硬编码元组而是主动扁平化。我们团队的标准流程是# 步骤1先用agg得到结果 result df.groupby(merchant_category).agg({ transaction_amount: [mean, median], processing_fee: [min, max] }) # 步骤2用list comprehension生成新列名清晰且可控 new_columns [f{col[0]}_{col[1]} for col in result.columns] result.columns new_columns # 步骤3重置索引得到标准DataFrame result result.reset_index()这样得到的就是merchant_category transaction_amount_mean transaction_amount_median processing_fee_min processing_fee_max Dining 55.10 52.30 1.36 2.03为什么不用result.columns result.columns.map(_.join)因为_.join((transaction_amount, mean))会变成transaction_amount_mean看起来没问题但一旦列名里有下划线比如user_id就会变成user_id_mean和原始列名混淆。我们的f{col[0]}_{col[1]}明确指定了层级杜绝歧义。这个小技巧让我们的报表脚本在交接给外包团队时零沟通成本。3.2 自定义函数的“空值防御”与性能红线自定义函数最容易在生产环境崩掉原因就两个空值和性能。看这个经典错误# 危险未处理空值 def range_calc(x): return x.max() - x.min() # 当某组数据全为空时x.max()和x.min()都返回nannan - nan nan # 表面看没问题但下游如果做result[range].sum()整个sum就变nan正确写法必须加防御def safe_range(series): 计算安全的极差自动过滤空值 if series.dropna().empty: # 先dropna再判空 return np.nan cleaned series.dropna() return cleaned.max() - cleaned.min()但更致命的是性能。apply()在pandas里是“慢操作”尤其当函数里有循环、IO、或复杂计算时。我们有个案例一个自定义函数要对每组数据做三次插值计算本地测试10万行要8秒上线后处理1000万行直接OOM。终极解法永远是向量化。比如上面的加权平均别写for i in range(len(series))而是用np.average(series, weightsweights)这是numpy底层C实现快百倍。我们的红线是任何自定义聚合函数执行时间必须控制在同规模内置函数的1.5倍以内。怎么测用%timeit在真实数据上跑不接受“理论上应该快”的说法。达不到要么重构为向量化要么承认这个指标不适合实时计算改用预聚合表。3.3 滚动窗口的“边界处理”与业务对齐rolling(window3).mean()开头两行是NaN这是数学必然。但业务方会问“为什么第一天没数是不是数据丢了”——你得解释清楚这不是bug是设计。但解释不如行动。我们提供三种标准处理方案由业务方拍板方案A推荐min_periods1df.rolling(window3, min_periods1).mean()第一天用自身值第二天用前两天均值。适合“趋势初现”场景如监控新上线功能的首日表现。方案B前向填充result.fillna(methodffill)用第一个有效值填充前面。适合“基准稳定”场景如计算客户信用分首日分值即为初始分。方案C截断丢弃result.dropna()只保留完整窗口数据。适合“严格对比”场景如合规审计要求所有计算基于完整周期。关键在于这个选择必须写进需求文档由业务方签字确认。我们吃过亏一次风控模型上线开发默认用了min_periods1业务方却认为应该丢弃导致模型回测结果偏差5%被迫回滚。现在所有滚动窗口的min_periods参数都作为需求项单独列出附带业务含义说明。3.4 扩展窗口的“状态一致性”保障expanding().sum()看似简单但有个隐藏雷区它依赖数据顺序。如果你的date列是字符串或者索引没按时间排序expanding()会按内存顺序累加结果完全错误。我们强制执行两条军规所有时间序列计算前必须sort_values(date).set_index(date)不是sort_values(date)就够了必须设为索引因为rolling()和expanding()默认按索引顺序计算。用validateTrue校验数据质量# 在计算前检查时间是否严格递增 if not df.index.is_monotonic_increasing: raise ValueError(时间索引非单调递增请检查数据顺序)这个检查帮我们拦截了三次因ETL任务失败导致的时间乱序问题避免了错误数据流入下游。3.5unstack()的“维度爆炸”预防与优雅降级unstack()很美但当分组维度组合数爆炸时比如1000个客户×100个产品10万列内存直接爆掉。我们有一套“优雅降级”策略第一道防线预估列数n_cols df.groupby([customer_id,product]).ngroups如果n_cols 1000自动触发警告建议改用pivot_table()或分批处理。第二道防线fill_value必须指定unstack(fill_value0)绝不留NaN。因为NaN在后续sum()、mean()中会传染而0是安全的中性值。第三道防线level参数精准控制unstack(level1)明确指定把第二级索引如product转为列避免误操作。我们曾有人手滑写成unstack()结果把customer_id也转了生成了百万列的怪物DataFrame。4. 端到端实战从银行信用卡数据到可交付报表现在我们把所有知识点揉进一个真实的、可运行的端到端案例。这不是玩具数据而是我从脱敏的真实银行信用卡数据中提炼的骨架。目标为零售银行的客户经理生成一份“客户-品类”双维度的经营分析简报。这份简报要能直接导入Power BI或粘贴进周报PPT。4.1 数据准备模拟真实业务约束真实数据从来不是完美的。我们刻意加入几个生产环境常见问题时间戳不连续客户并非每天都有交易存在空白日期。异常值有测试卡刷出1分钱或999999元的订单。维度缺失部分老客户无region信息标记为Unknown。业务规则手续费按交易额2.5%计算但最低收1元最高收50元。import pandas as pd import numpy as np from datetime import datetime, timedelta # 设置随机种子确保可复现 np.random.seed(42) # 生成60天的日期范围模拟2个月 dates pd.date_range(2024-01-01, periods60, freqD) # 客户列表含一个Unknown区域客户模拟数据缺失 customers [C001, C002, C003, C004] regions [North, South, East, West, Unknown] categories [Groceries, Dining, Travel, Retail, Electronics] # 构建基础交易表 n_rows 5000 data { date: np.random.choice(dates, n_rows), customer_id: np.random.choice(customers, n_rows), region: np.random.choice(regions, n_rows, p[0.2, 0.2, 0.2, 0.2, 0.2]), # Unknown占20% category: np.random.choice(categories, n_rows), amount: np.random.lognormal(mean5.5, sigma0.8, sizen_rows).round(2) # 对数正态分布更贴近真实消费 } df pd.DataFrame(data) # 引入业务规则手续费计算最低1元最高50元 df[fee] (df[amount] * 0.025).round(2) df[fee] df[fee].clip(lower1.0, upper50.0) # 关键应用业务规则 # 注入少量异常值模拟数据质量问题 anomaly_idx np.random.choice(df.index, size20, replaceFalse) df.loc[anomaly_idx[:10], amount] 0.01 # 测试卡小额 df.loc[anomaly_idx[10:], amount] 999999.00 # 测试卡大额 print(原始数据概览) print(f总记录数: {len(df)}) print(f日期范围: {df[date].min()} 到 {df[date].max()}) print(f客户数: {df[customer_id].nunique()}) print(f区域分布:\n{df[region].value_counts()})4.2 分析1客户-品类双维度核心指标多列聚合扁平化这是简报的基石。我们要一次性给出每个客户在每个品类的平均交易额、交易笔数、手续费总额、手续费占比。# 步骤1定义聚合字典明确业务意图 agg_dict { amount: [mean, count], # 平均额、笔数 fee: [sum, lambda x: (x.sum() / df.loc[x.index, amount].sum() * 100).round(2)] # 手续费总额、手续费率 } # 步骤2执行聚合注意这里用的是df原数据不是分组后的 # 但lambda里要用df.loc[x.index]来获取对应行的amount确保分母准确 result_base df.groupby([customer_id, category]).agg(agg_dict) # 步骤3扁平化列名核心避坑点 # 注意lambda函数没有名字pandas会自动命名为lambda # 我们用map来重命名确保可读 new_cols [] for col in result_base.columns: if col[1] lambda: new_cols.append(f{col[0]}_fee_rate_pct) else: new_cols.append(f{col[0]}_{col[1]}) result_base.columns new_cols result_base result_base.reset_index() # 步骤4清洗列名去掉冗余下划线 result_base.columns [col.replace(_lambda, _fee_rate_pct) for col in result_base.columns] print(\n 分析1客户-品类核心指标 ) print(result_base.head(10)) print(f\n结果形状: {result_base.shape} (客户数×品类数))4.3 分析2风险识别——高价值交易占比自定义函数防御业务规则单笔交易额≥300元为高价值交易。我们需要每个客户的高价值交易笔数、占比、以及普通交易的平均额。def risk_segmentation(series): 风险分层计算高价值交易识别 依据《信用卡反欺诈操作指引V2.1》第3.4条 high_val_threshold 300.0 # 防御空序列 if len(series) 0: return pd.Series({ high_value_count: 0, high_value_pct: 0.0, regular_avg: np.nan }) # 计算高价值交易 high_mask series high_val_threshold high_count high_mask.sum() # 计算普通交易均值需防御可能全为高价值 regular_series series[~high_mask] regular_avg regular_series.mean() if len(regular_series) 0 else np.nan return pd.Series({ high_value_count: high_count, high_value_pct: round((high_count / len(series)) * 100, 1), regular_avg: round(regular_avg, 2) if not np.isnan(regular_avg) else np.nan }) # 应用自定义函数 risk_result df.groupby(customer_id)[amount].apply(risk_segmentation).reset_index() print(\n 分析2客户风险分层 ) print(risk_result)4.4 分析3动态趋势——滚动30天客单价滚动窗口业务对齐为客户经理提供“最近一个月”的动态视角而非静态均值。# 步骤1确保时间顺序生产环境生死线 df_sorted df.sort_values([customer_id, date]).copy() df_sorted[date] pd.to_datetime(df_sorted[date]) # 强制转datetime df_sorted df_sorted.set_index(date) # 步骤2计算滚动30天平均交易额按客户分组 # 使用min_periods15确保至少有半个月数据才计算避免噪音 rolling_30d df_sorted.groupby(customer_id)[amount].rolling( window30D, min_periods15 ).mean().reset_index(namerolling_30d_avg) # 步骤3取每个客户的最新滚动值即截至今天的数据 latest_rolling rolling_30d.sort_values([customer_id, date]).groupby(customer_id).tail(1) print(\n 分析3客户最新30天滚动客单价 ) print(latest_rolling)4.5 分析4全景视图——区域-品类交叉表多级分组unstack这是给管理层看的“作战地图”。# 步骤1按区域和品类聚合平均交易额 crosstab_raw df.groupby([region, category])[amount].mean().unstack(fill_value0) # 步骤2按业务习惯排序区域按North/South/East/West/Unknown品类按消费频次 region_order [North, South, East, West, Unknown] category_order [Groceries, Dining, Retail, Travel, Electronics] # 重新索引确保顺序 crosstab_ordered crosstab_raw.reindex(indexregion_order, columnscategory_order, fill_value0) # 步骤3格式化保留两位小数 crosstab_final crosstab_ordered.round(2) print(\n 分析4区域-品类平均交易额交叉表 ) print(crosstab_final)4.6 综合简报生成可交付的Excel报告最后把所有分析结果整合成一份结构清晰的Excel文件每个Sheet对应一个分析模块。# 创建Excel写入器 with pd.ExcelWriter(customer_analytics_report.xlsx, engineopenpyxl) as writer: # Sheet 1: 核心指标 result_base.to_excel(writer, sheet_name1_客户品类指标, indexFalse) # Sheet 2: 风险分层 risk_result.to_excel(writer, sheet_name2_风险分层, indexFalse) # Sheet 3: 动态趋势 latest_rolling.to_excel(writer, sheet_name3_滚动趋势, indexFalse) # Sheet 4: 交叉表 crosstab_final.to_excel(writer, sheet_name4_区域品类矩阵) # Sheet 5: 数据质量摘要生产环境必备 quality_summary pd.DataFrame({ 指标: [总交易笔数, 客户数, 区域数, 异常值笔数, Unknown区域客户数], 数值: [ len(df), df[customer_id].nunique(), df[region].nunique(), len(anomaly_idx), df[df[region]Unknown][customer_id].nunique() ] }) quality_summary.to_excel(writer, sheet_name0_数据质量, indexFalse) print(\n✅ 报告已生成customer_analytics_report.xlsx) print(包含5个Sheet覆盖从明细到汇总、从静态到动态的全维度分析。)5. 常见问题与实战排查那些让你半夜爬起来的Bug再完美的设计也挡不住生产环境的千奇百怪。我把最常遇到、最让人崩溃的五个问题连同我们的“秒级定位法”毫无保留地分享出来。这些都是真金白银交过学费换来的。5.1 问题agg()后列名是元组result[amount_mean]报KeyError现象代码在Jupyter里跑得好好的一放到Airflow调度就报错KeyError: amount_mean。根因本地环境pandas版本是1.5生产环境是1.3旧版本agg()返回的列名处理逻辑不同。秒级定位法在报错行前加print(result.columns.tolist())一眼看出是[(amount, mean), (amount, count)]还是[amount_mean, amount_count]。通用解法兼容所有版本永远用result[(amount, mean)]访问而不是字符串。终极解法推荐在agg()后立即执行扁平化见3.1节一劳永逸。5.2 问题rolling().mean()结果全是NaN现象滚动计算后所有值都是NaN检查数据确认不为空。根因rolling()默认按索引顺序计算而你的DataFrame索引是默认的0,1,2...不是时间。rolling(window3)就是在算第0、1、2行的均值但业务上你需要的是按date列的时间顺序。秒级定位法print(df.index)如果输出RangeIndex(start0, stop100, step1)就是索引问题。print(df[date].is_monotonic_increasing)如果False说明时间列本身就不有序。修复df df.sort_values(date).set_index(date)再rolling()。5.3 问题unstack()后内存爆掉Jupyter直接挂掉现象df.groupby([a,b]).size().unstack()数据量不大但Python进程内存飙升到20GB。根因unstack()会创建一个稠密矩阵。如果a有1000个值b有1000个值即使只有10000条记录unstack()也会生成1000×1000100万列的DataFrame大部分是NaN。秒级定位法print(df.groupby([a,b]).ngroups)如果结果远大于len(df)就要警惕。print(df[a].nunique() * df[b].nunique())这就是潜在的列数。修复方案1推荐用pivot_table()替代它天生支持稀疏填充df.pivot_table(indexa, columnsb, valuesvalue, aggfuncsum, fill_value0)方案2先groupby().size().reset_index(namecount)再pivot()只生成有数据的行列。5.4 问题自定义函数在apply()里报SettingWithCopyWarning现象函数里写了series.iloc[0] 100运行时报SettingWithCopyWarning且修改不生效。根因apply()传给函数的series可能是原始DataFrame的一个视图view不是副本copy。直接赋值会失败。秒级定位法在函数开头加print(series._is_view)如果是True就是视图问题。修复在函数内第一行加series series.copy()确保操作安全。虽然有轻微性能损失但比线上事故便宜得多。5.5 问题expanding().sum()结果与Excel手工计算不一致现象同一个数据pandas算的累计和和Excel里拖拽公式算的不一样。根因Excel默认按行顺序累加而pandas的expanding()按索引顺序。如果索引乱了结果必然错。秒级定位法print(df.index)确认是DatetimeIndex且单调。print(df.head().to_dict(records))对比前几行数据看pandas的“第一行”是不是Excel里的“第一行”。修复df df.sort_index()再expanding()。记住时间序列的一切计算前提都是索引有序。6. 生产环境加固让分析脚本从“能跑”到“敢上”写完一个漂亮的分析脚本只是万里长征第一步。在银行、保险这类强监管行业“能跑”和“敢上”之间隔着一套完整的生产加固体系。这是我们团队用三年时间打磨出来的“五步加固法”。6.1 第一步输入数据契约Data Contract绝不相信上游给的数据。我们在每个脚本开头强制定义输入数据的“契约”def validate_input_data(df: pd.DataFrame) - None: 数据契约验证必须满足的硬性条件 required_cols {date, customer_id, amount, category} missing_cols required_cols - set(df.columns) if missing_cols: raise ValueError(f输入数据缺少必需列: {missing_cols}) # 业务规则验证 if df[amount].min() 0: raise ValueError(交易额不能为负数) if df[date].isnull().any(): raise ValueError(日期列存在空值) # 数据质量验证容忍度 null_rate df[category].isnull().mean() if null_rate 0.05: # 超过5%空值报警 print(f⚠️ 警告category列空值率{null_rate:.2%}高于阈值5%) print(✅ 输入数据契约验证通过

更多文章