我来帮您生成一个完整的Python脚本来处理柒道表(多层表头)和L8表的智能对比。基于您提供的列名映射关系和表格结构,我将创建一个完整的、可运行的解决方案。
```python
"""
柒道表与L8表智能对比脚本
功能:自动解析多层表头、标准化列名、智能匹配对比
作者:数据分析助手
日期:2024年
"""
import pandas as pd
import numpy as np
import re
import os
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
# ==================== 1. 配置参数 ====================
# 列名缩写映射字典(基于您提供的映射关系)
ABBREVIATION_MAP = {
# 指标类
"实": "实际", "标": "目标", "达": "达成", "比": "同比", "同": "同期",
"占": "占比", "折": "折扣", "效": "店效", "连": "连带",
# 维度类
"区": "区域", "店": "门店", "销": "销售", "月": "月度", "周": "周度",
"日": "日度", "年": "年度", "渠": "渠道", "类": "品类", "客": "客流",
# 其他常见缩写
"额": "金额", "量": "数量", "率": "率", "数": "数量"
}
# 常见组合映射(业务语义增强)
COMBO_MAP = {
"店销": "门店销售",
"销额": "销售额",
"销量": "销售量",
"客数": "客流量",
"连带率": "连带率",
"达成率": "达成率",
"达成%": "达成率",
"同比%": "同比增长率",
"占比%": "占比",
"店效": "店效",
"折扣": "折扣"
}
# ==================== 2. 核心函数定义 ====================
def standardize_abbreviation(text: str) -> str:
"""
将缩写列名标准化为全称
示例:'月_店销_实' → '月度_门店销售_实际'
"""
if pd.isna(text) or str(text).strip() == "":
return ""
result = str(text).strip()
# 1. 先替换常见组合(避免短词干扰)
for combo, full in COMBO_MAP.items():
result = result.replace(combo, full)
# 2. 替换单个缩写字符
for abbr, full in sorted(ABBREVIATION_MAP.items(), key=lambda x: -len(x[0])):
# 使用正则确保是独立的词,而不是部分词
pattern = r'(?<![a-zA-Z0-9\u4e00-\u9fa5])' + re.escape(abbr) + r'(?![a-zA-Z0-9\u4e00-\u9fa5])'
result = re.sub(pattern, full, result)
# 3. 清理特殊字符和下划线
result = re.sub(r'[\s_\u3000]+', '_', result) # 空格和全角空格转下划线
result = re.sub(r'^_+|_+$', '', result) # 去掉首尾下划线
result = re.sub(r'_+', '_', result) # 合并连续下划线
return result
def parse_qidao_multilevel_header(df_raw: pd.DataFrame, header_rows: int = 2) -> List[str]:
"""
解析柒道表的多层表头
返回标准化后的列名列表
"""
print("开始解析柒道表多层表头...")
# 如果已经是多层表头(MultiIndex)
if isinstance(df_raw.columns, pd.MultiIndex):
print("检测到MultiIndex表头")
headers = []
for col_tuple in df_raw.columns:
# 将多层表头合并为一个字符串
parts = []
for level in col_tuple:
if pd.notna(level) and str(level).strip() and not str(level).startswith('Unnamed'):
parts.append(str(level).strip())
if parts:
header_str = '_'.join(parts)
else:
header_str = ""
headers.append(standardize_abbreviation(header_str))
# 如果是单层表头但有多行数据需要合并
else:
print("检测到单层表头,尝试从数据中提取表头...")
# 读取前几行作为表头信息
header_data = df_raw.iloc[:header_rows].copy()
# 转置以便处理
header_data = header_data.T.reset_index(drop=True)
# 合并表头行
headers = []
for i in range(len(header_data)):
row_values = []
for row_idx in range(header_rows):
val = header_data.iloc[i, row_idx] if row_idx < len(header_data.columns) else ""
if pd.notna(val) and str(val).strip() and not str(val).startswith('Unnamed'):
row_values.append(str(val).strip())
if row_values:
header_str = '_'.join(row_values)
else:
header_str = f"column_{i}"
headers.append(standardize_abbreviation(header_str))
# 更新DataFrame(去掉表头行)
df_raw = df_raw.iloc[header_rows:].reset_index(drop=True)
# 处理空列名
for i, header in enumerate(headers):
if not header or header == "":
headers[i] = f"未命名列_{i}"
print(f"解析完成,共{len(headers)}列")
return headers, df_raw
def extract_l8_structure(df_l8: pd.DataFrame) -> Tuple[List[str], pd.DataFrame]:
"""
从L8表提取表头结构
假设前2行是表头信息
"""
print("开始解析L8表结构...")
# 读取前两行作为表头
row0 = df_l8.iloc[0].fillna("").astype(str).str.strip()
row1 = df_l8.iloc[1].fillna("").astype(str).str.strip()
cols = []
for i in range(len(row0)):
c0 = row0.iloc[i]
c1 = row1.iloc[i]
# 跳过空列名
if c0 == "" and c1 == "":
cols.append(f"未命名列_{i}")
continue
# 解析组合表头
time_part = ""
channel_part = ""
metric_part = ""
# 解析第一行(时间+渠道)
if "-" in c0:
parts = c0.split("-", 1)
time_part = parts[0].strip()
channel_part = parts[1].strip()
else:
# 启发式识别
for time_key in ["月", "周", "日", "年"]:
if time_key in c0:
time_part = time_key + ("度" if time_key != "日" else "")
channel_part = c0.replace(time_key, "").strip()
break
else:
channel_part = c0
# 第二行是指标
metric_part = c1
# 标准化各部分
time_std = standardize_abbreviation(time_part)
channel_std = standardize_abbreviation(channel_part)
metric_std = standardize_abbreviation(metric_part)
# 组合列名
parts_list = [time_std, channel_std, metric_std]
col_name = "_".join([p for p in parts_list if p])
if not col_name:
col_name = f"列_{i}"
cols.append(col_name)
# 更新DataFrame(去掉表头行)
df_l8_clean = df_l8.iloc[2:].reset_index(drop=True)
print(f"L8表解析完成,共{len(cols)}列")
return cols, df_l8_clean
def identify_columns_by_pattern(df: pd.DataFrame, pattern_dict: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""
根据模式识别不同类型的列
"""
result = {}
all_columns = df.columns.tolist()
for col_type, patterns in pattern_dict.items():
matched_cols = []
for col in all_columns:
for pattern in patterns:
if pattern in col:
matched_cols.append(col)
break
result[col_type] = matched_cols
return result
def align_and_compare(df1: pd.DataFrame, df2: pd.DataFrame,
df1_name: str = "柒道表", df2_name: str = "L8表") -> pd.DataFrame:
"""
对齐两个表格并进行对比
"""
print(f"开始对齐对比 {df1_name} 和 {df2_name}...")
# 识别关键列
patterns = {
'date_columns': ['日期', '时间', '月', '周', '日', '年', 'period'],
'type_columns': ['类型', '业务', '品类', '渠道', '区域', '门店'],
'actual_columns': ['实际', '实绩', '实'],
'target_columns': ['目标', '计划', '标'],
'ratio_columns': ['达成率', '达成', '占比', '率', '比']
}
df1_cols = identify_columns_by_pattern(df1, patterns)
df2_cols = identify_columns_by_pattern(df2, patterns)
# 创建对比结果
comparison_results = []
# 尝试匹配相似的列
for col1 in df1.columns:
if col1 in ['', 'Unnamed: 0', '未命名列'] or 'Unnamed:' in str(col1):
continue
best_match = None
best_score = 0
for col2 in df2.columns:
if col2 in ['', 'Unnamed: 0', '未命名列'] or 'Unnamed:' in str(col2):
continue
# 计算相似度
score = calculate_similarity(col1, col2)
if score > best_score and score > 0.5: # 相似度阈值
best_score = score
best_match = col2
if best_match:
# 对比数据
for idx in range(min(len(df1), len(df2))):
val1 = df1.iloc[idx][col1] if idx < len(df1) else None
val2 = df2.iloc[idx][best_match] if idx < len(df2) else None
if pd.notna(val1) and pd.notna(val2):
try:
num1 = float(val1)
num2 = float(val2)
diff = num1 - num2
diff_pct = (diff / num2 * 100) if num2 != 0 else None
comparison_results.append({
'柒道表列名': col1,
'L8表列名': best_match,
'行索引': idx,
f'{df1_name}_值': num1,
f'{df2_name}_值': num2,
'绝对差异': diff,
'相对差异%': diff_pct,
'匹配相似度': best_score,
'状态': '匹配成功' if abs(diff_pct or 0) < 1 else '差异较大'
})
except:
# 非数值型数据,只记录是否一致
comparison_results.append({
'柒道表列名': col1,
'L8表列名': best_match,
'行索引': idx,
f'{df1_name}_值': val1,
f'{df2_name}_值': val2,
'绝对差异': None,
'相对差异%': None,
'匹配相似度': best_score,
'状态': '类型不一致' if val1 != val2 else '一致'
})
comparison_df = pd.DataFrame(comparison_results)
if not comparison_df.empty:
print(f"对比完成,共找到 {len(comparison_df)} 条匹配记录")
else:
print("未找到匹配的列,请检查表格结构")
return comparison_df
def calculate_similarity(str1: str, str2: str) -> float:
"""
计算两个字符串的相似度(基于共同字符和长度)
"""
if not str1 or not str2:
return 0.0
# 转换为小写并移除特殊字符
s1 = re.sub(r'[^\w\u4e00-\u9fa5]', '', str1.lower())
s2 = re.sub(r'[^\w\u4e00-\u9fa5]', '', str2.lower())
# 计算共同字符数
common_chars = set(s1) & set(s2)
# 计算相似度
if not s1 or not s2:
return 0.0
similarity = len(common_chars) / max(len(s1), len(s2))
return similarity
def generate_summary_report(comparison_df: pd.DataFrame) -> Dict:
"""
生成对比报告摘要
"""
if comparison_df.empty:
return {"error": "没有对比数据"}
summary = {
"total_matches": len(comparison_df),
"successful_matches": len(comparison_df[comparison_df['状态'] == '匹配成功']),
"large_differences": len(comparison_df[comparison_df['状态'] == '差异较大']),
"type_mismatches": len(comparison_df[comparison_df['状态'] == '类型不一致']),
}
# 计算平均差异
numeric_diff = comparison_df[comparison_df['相对差异%'].notna()]['相对差异%']
if not numeric_diff.empty:
summary["avg_difference_pct"] = numeric_diff.abs().mean()
summary["max_difference_pct"] = numeric_diff.abs().max()
summary["min_difference_pct"] = numeric_diff.abs().min()
# 找出差异最大的10条记录
if '相对差异%' in comparison_df.columns:
top_diff = comparison_df.copy()
top_diff['abs_diff'] = top_diff['相对差异%'].abs()
top_diff = top_diff.sort_values('abs_diff', ascending=False).head(10)
summary["top_differences"] = top_diff[['柒道表列名', 'L8表列名', '相对差异%', '状态']].to_dict('records')
return summary
def save_results(df_qidao: pd.DataFrame, df_l8: pd.DataFrame,
comparison_df: pd.DataFrame, summary: Dict, output_dir: str = "output"):
"""
保存处理结果
"""
os.makedirs(output_dir, exist_ok=True)
# 保存标准化后的表格
df_qidao.to_excel(f"{output_dir}/柒道表_标准化.xlsx", index=False)
df_l8.to_excel(f"{output_dir}/L8表_标准化.xlsx", index=False)
# 保存对比结果
comparison_df.to_excel(f"{output_dir}/对比报告.xlsx", index=False)
# 保存摘要报告
with open(f"{output_dir}/对比摘要.md", "w", encoding="utf-8") as f:
f.write("# 柒道表与L8表对比报告\n\n")
f.write(f"## 对比摘要\n")
f.write(f"- 总匹配记录数: {summary.get('total_matches', 0)}\n")
f.write(f"- 成功匹配数: {summary.get('successful_matches', 0)}\n")
f.write(f"- 差异较大数: {summary.get('large_differences', 0)}\n")
f.write(f"- 类型不一致数: {summary.get('type_mismatches', 0)}\n")
if 'avg_difference_pct' in summary:
f.write(f"- 平均相对差异: {summary['avg_difference_pct']:.2f}%\n")
f.write(f"- 最大相对差异: {summary['max_difference_pct']:.2f}%\n")
f.write(f"- 最小相对差异: {summary['min_difference_pct']:.2f}%\n")
f.write("\n## 差异最大的10条记录\n")
f.write("| 柒道表列名 | L8表列名 | 相对差异% | 状态 |\n")
f.write("|------------|----------|-----------|------|\n")
for record in summary.get('top_differences', []):
f.write(f"| {record['柒道表列名']} | {record['L8表列名']} | {record.get('相对差异%', 'N/A'):.2f}% | {record['状态']} |\n")
print(f"结果已保存到 {output_dir} 目录")
# ==================== 3. 主执行函数 ====================
def main():
"""
主执行函数
"""
print("=" * 60)
print("柒道表与L8表智能对比系统")
print("=" * 60)
try:
# ==================== 步骤1: 读取数据 ====================
print("\n1. 读取数据文件...")
# 请根据实际文件路径修改
qidao_file = "柒道表.xlsx" # 替换为您的柒道表文件路径
l8_file = "L8表.xlsx" # 替换为您的L8表文件路径
if not os.path.exists(qidao_file):
print(f"错误: 柒道表文件不存在: {qidao_file}")
print("请将柒道表.xlsx放在当前目录或修改文件路径")
return
if not os.path.exists(l8_file):
print(f"错误: L8表文件不存在: {l8_file}")
print("请将L8表.xlsx