针对“Python读取Excel表头,将每个表头数据存入独立DataFrame,对比表头是否一致并合并相同表头数据,支持从键盘输入后期路径且不限表头数量”的需求,解决方案的核心流程为:**1.交互式路径获取**、**2.按表头分组加载数据**、**3.同结构数据合并**[ref_1]。具体实现与代码示例如下。
### **1. 整体方案设计**
为处理任意数量、任意表头结构的Excel文件,需要建立一个以**表头结构**为键,以**包含该表头的所有DataFrame列表**为值的映射字典。这能高效实现“表头一致则数据合并”的逻辑。程序应包含三个主要模块:输入、处理、输出。
```python
import pandas as pd
import os
# 核心数据结构:{‘表头元组’: [df1, df2, ...]}
header_to_dataframes_map = {}
```
字典的键是标准化后的表头元组(`tuple`),值是一个列表,用于存储所有具有该表头的原始数据`DataFrame`。通过比较元组,即可判断表头是否完全一致 [ref_3]。
### **2. 从键盘交互式获取文件路径**
需要编写一个函数,允许用户持续输入文件路径,直到输入结束。
```python
def collect_file_paths_from_keyboard():
"""
从键盘标准输入循环收集文件路径,支持绝对路径和相对路径。
输入空行(直接按回车)表示结束输入。
返回:有效的文件路径列表。
"""
print("请逐个输入Excel文件的完整路径(如:C:\\data\\file1.xlsx)。")
print("输入完成后,直接按回车键结束输入。")
collected_paths = []
input_count = 1
while True:
try:
# 提示用户输入
user_input = input(f"[路径 {input_count}]: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n输入被中断。")
break
# 检查是否结束输入
if user_input == "":
if input_count == 1:
print("未输入任何路径,程序将退出。")
else:
print("路径输入结束。")
break
# 检查文件是否存在且是否为支持的格式
if not os.path.isfile(user_input):
print(f" 警告:路径 '{user_input}' 不存在或不是一个文件,请重新输入。")
continue
if not (user_input.lower().endswith('.xlsx') or user_input.lower().endswith('.xls')):
print(f" 警告:文件 '{user_input}' 不是Excel格式(.xlsx/.xls),已跳过。")
continue
collected_paths.append(user_input)
print(f" 已添加:{user_input}")
input_count += 1
print(f"\n总计收集到 {len(collected_paths)} 个有效Excel文件路径。")
return collected_paths
# 使用示例
excel_file_list = collect_file_paths_from_keyboard()
```
此函数实现了灵活的交互式输入,并对无效路径或非Excel文件给出明确提示,确保后续流程的稳定性 [ref_2]。
### **3. 按表头分组加载数据**
遍历所有文件路径,使用`pandas`读取Excel文件,提取表头和数据,并按表头结构进行分组存储。
```python
def load_and_group_excel_files(file_path_list):
"""
加载所有Excel文件,并按完全相同的表头结构进行分组。
参数:
file_path_list: Excel文件路径列表。
返回:
grouped_data: 字典,{header_tuple: [list_of_dataframes]}
load_report: 列表,记录每个文件的加载详情,用于生成报告。
"""
grouped_data = {}
load_report = []
for idx, file_path in enumerate(file_path_list, 1):
file_name = os.path.basename(file_path)
try:
# 使用pandas读取Excel文件,假设表头在第一行
df = pd.read_excel(file_path, engine='openpyxl') # 对于.xlsx文件 [ref_2]
except Exception as e:
# 捕获读取异常,如文件损坏、格式错误等
print(f" [{idx}/{len(file_path_list)}] 错误:无法读取文件 '{file_name}'。原因:{e}")
load_report.append({
'文件': file_name,
'状态': '失败',
'原因': str(e),
'表头': None,
'数据行数': 0
})
continue
# 获取表头并转换为元组作为字典键
original_header = df.columns.tolist() # 获取原始列名列表 [ref_2]
header_key = tuple(original_header) # 转换为不可变、可哈希的元组
# 初始化该表头结构的存储列表(如果是首次遇到)
if header_key not in grouped_data:
grouped_data[header_key] = []
# 将当前文件的DataFrame存入对应分组
grouped_data[header_key].append(df)
# 记录加载详情
load_report.append({
'文件': file_name,
'状态': '成功',
'原因': 'N/A',
'表头': str(original_header),
'数据行数': df.shape[0]
})
print(f" [{idx}/{len(file_path_list)}] 已加载:'{file_name}' -> 表头({len(original_header)}列): {original_header[:3]}...")
return grouped_data, load_report
# 执行数据加载与分组
data_groups, file_report = load_and_group_excel_files(excel_file_list)
print(f"\n文件加载完成。共形成 {len(data_groups)} 个不同的表头结构分组。")
```
此函数是核心数据处理环节,它读取每个Excel文件,并通过`df.columns`获取表头 [ref_2],然后利用字典的键唯一性自动完成表头比对和分组。元组(`tuple`)的使用确保了表头可以作为字典的键进行比较。
### **4. 合并相同表头的数据并输出**
遍历分组字典,对每个表头分组下的所有`DataFrame`列表进行纵向合并,并将结果保存为新的Excel文件。
```python
def merge_and_export_groups(data_groups_dict, output_base_dir='./merged_excel_output'):
"""
合并每个表头分组的数据,并将合并结果导出为独立的Excel文件。
参数:
data_groups_dict: 由 load_and_group_excel_files 返回的分组字典。
output_base_dir: 合并文件输出目录。
返回:
merge_summary: 合并操作的摘要列表。
"""
import os
# 确保输出目录存在
os.makedirs(output_base_dir, exist_ok=True)
merge_summary = []
print("\n开始合并具有相同表头的文件...")
print("-" * 60)
for group_index, (header_tuple, df_list) in enumerate(data_groups_dict.items(), 1):
header_list = list(header_tuple)
file_count = len(df_list)
total_rows_original = sum(df.shape[0] for df in df_list)
# 核心合并操作:使用 concat 进行纵向堆叠 [ref_1]
merged_df = pd.concat(df_list, axis=0, ignore_index=True, sort=False)
# 生成输出文件名(避免文件名过长或非法字符)
# 使用前两列表头(如果存在)来命名,并进行安全处理
if len(header_list) >= 2:
name_part = f"{str(header_list[0])[:10]}_{str(header_list[1])[:10]}"
elif len(header_list) == 1:
name_part = str(header_list[0])[:20]
else:
name_part = f"header_group_{group_index}"
# 清理文件名中的非法字符
safe_name_part = "".join(c for c in name_part if c.isalnum() or c in (' ', '_')).rstrip()
safe_name_part = safe_name_part.replace(' ', '_')
output_file_name = f"merged_group_{group_index:03d}_{safe_name_part}.xlsx"
output_file_path = os.path.join(output_base_dir, output_file_name)
# 保存合并后的DataFrame到Excel
try:
merged_df.to_excel(output_file_path, index=False)
save_status = '成功'
except Exception as e:
save_status = f'失败({e})'
output_file_path = None
# 记录本次合并的摘要信息
summary_entry = {
'分组编号': group_index,
'表头列数': len(header_list),
'表头结构预览': str(header_list[:3]) + ('...' if len(header_list) > 3 else ''),
'包含文件数': file_count,
'合并前行数': total_rows_original,
'合并后行数': merged_df.shape[0],
'输出文件名': output_file_name,
'保存状态': save_status
}
merge_summary.append(summary_entry)
# 打印本次合并的进度信息
status_icon = '✅' if save_status == '成功' else '❌'
print(f"{status_icon} 分组 {group_index}: 合并了 {file_count} 个文件 -> {output_file_name} "
f"({total_rows_original}行 -> {merged_df.shape[0]}行)")
# 打印最终的合并摘要表格
print("\n" + "=" * 60)
print("数据合并汇总报告:")
if merge_summary:
summary_df = pd.DataFrame(merge_summary)
# 选择要展示的列
display_cols = ['分组编号', '表头列数', '包含文件数', '合并前行数', '合并后行数', '输出文件名', '保存状态']
print(summary_df[display_cols].to_string(index=False))
print(f"\n所有合并文件已保存至目录:{os.path.abspath(output_base_dir)}")
else:
print("没有需要合并的数据分组。")
return merge_summary
# 执行合并与导出
final_summary = merge_and_export_groups(data_groups)
```
该函数实现了数据的最终合并与持久化。`pd.concat` 是合并多个`DataFrame`的关键函数,`ignore_index=True` 参数确保合并后的索引是连续的 [ref_1]。每个分组的结果被保存为一个独立的Excel文件,文件名包含了分组编号和表头特征,便于追溯。
### **5. 生成详细的分析报告**
为提供更清晰的执行概览,可以生成一个详细的报告,展示每个原始文件的表头结构和其所属的分组情况。
```python
def generate_detailed_analysis_report(file_report_list, data_groups_dict):
"""
根据加载报告和分组字典,生成一个详细的DataFrame报告。
"""
# 为每个表头分组生成一个唯一的组ID(例如,使用哈希值后几位)
group_id_map = {}
for idx, header_tuple in enumerate(data_groups_dict.keys(), 1001):
group_id_map[header_tuple] = f"G{idx}"
# 构建报告行
report_rows = []
for file_info in file_report_list:
if file_info['状态'] == '成功':
# 查找该文件表头对应的组ID
# 注意:file_info['表头']是字符串,需要还原为元组进行查找
# 这里简化为在所有分组键中查找匹配(实际应用中应保存映射关系)
header_str = file_info['表头']
matched_group_id = "独立文件"
for header_tuple, group_id in group_id_map.items():
if str(list(header_tuple)) == header_str:
matched_group_id = group_id
break
else:
matched_group_id = "N/A"
report_rows.append({
'文件名': file_info['文件'],
'加载状态': file_info['状态'],
'失败原因': file_info['原因'],
'表头结构': file_info['表头'][:50] + '...' if file_info['表头'] and len(file_info['表头']) > 50 else file_info['表头'],
'数据行数': file_info['数据行数'],
'所属合并组': matched_group_id
})
analysis_df = pd.DataFrame(report_rows)
return analysis_df
# 生成并显示报告
if file_report:
print("\n=== 详细文件分析报告 ===")
analysis_report_df = generate_detailed_analysis_report(file_report, data_groups)
print(analysis_report_df.to_string(index=False))
```
此报告能帮助用户一目了然地看到哪些文件被成功加载、它们的表头是什么、数据量大小,以及最重要的——哪些文件因为表头相同而被归入了同一个合并组(如`G1001`)。
### **6. 完整可执行脚本整合**
将上述所有功能整合到一个完整的脚本中,并提供主函数入口。
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
功能:从键盘输入任意多个Excel文件路径,自动按表头结构分组,合并表头完全相同的文件。
"""
import pandas as pd
import os
def main():
print("=" * 70)
print(" Excel文件表头比对与自动合并工具")
print("=" * 70)
# 步骤1:获取文件路径
print("\n[步骤1/3] 请输入Excel文件路径。")
all_files = collect_file_paths_from_keyboard()
if not all_files:
return
# 步骤2:加载并分组文件
print(f"\n[步骤2/3] 正在加载并分析 {len(all_files)} 个文件...")
grouped_data, load_report = load_and_group_excel_files(all_files)
# 简单统计
success_files = [r for r in load_report if r['状态'] == '成功']
print(f" 成功加载文件: {len(success_files)} 个")
print(f" 失败文件: {len(load_report) - len(success_files)} 个")
print(f" 形成的表头分组数: {len(grouped_data)} 个")
if not grouped_data:
print("错误:没有成功加载任何可用的Excel数据,程序退出。")
return
# 步骤3:合并与导出
print(f"\n[步骤3/3] 正在合并数据并导出结果...")
output_directory = "./合并结果"
merge_summary = merge_and_export_groups(grouped_data, output_directory)
# 最终提示
print("\n" + "=" * 70)
print("程序执行完成!")
if merge_summary:
total_merged_groups = len([s for s in merge_summary if s['保存状态'] == '成功'])
print(f"成功生成了 {total_merged_groups} 个合并文件。")
print(f"请查看目录 '{os.path.abspath(output_directory)}' 获取结果。")
print("=" * 70)
# 确保主函数在脚本直接运行时被调用
if __name__ == "__main__":
main()
```
运行此脚本,用户只需根据提示输入文件路径,程序将自动完成加载、比对、合并和输出的全过程 [ref_1]。
### **7. 应用场景与高级扩展**
此方案适用于多种需要整合多源数据的业务场景 [ref_1]:
* **多部门数据汇总**:各部门每月提交的Excel报表模板可能相同,此脚本可自动将所有相同模板的文件合并成一个总表。
* **数据仓库ETL预处理**:在将多个来源的Excel数据导入数据库前,先按表头结构分类,便于映射到不同的目标数据表。
* **数据一致性检查**:通过生成的报告,可以快速识别出哪些文件的表头与标准模板不一致,辅助进行数据质量管控。
**潜在的高级扩展方向**:
1. **表头模糊匹配**:当前方案要求表头**完全一致**。可扩展为支持模糊匹配,例如,在比较前对表头进行清洗(统一大小写、去除空格、替换同义词),使用集合运算或编辑距离(如Levenshtein距离)来判断表头是否“足够相似”。
```python
def normalize_header(header_list):
"""表头标准化:转为小写,去除首尾空格。"""
return [str(col).strip().lower() for col in header_list]
# 在加载函数中使用
normalized_header = normalize_header(df.columns.tolist())
header_key = tuple(normalized_header) # 使用标准化后的表头作为键
```
2. **处理超大数据集**:当单个文件或合并后的数据极大时,可使用 `pandas` 的 `chunksize` 参数分块读取,或采用 `dask` 等库进行并行和惰性计算,以避免内存溢出。
3. **更灵活的输入输出**:扩展支持从文件夹批量读取、支持更多文件格式(如CSV、JSON),以及将合并结果导出到数据库或不同格式的文件。