你指出得非常准确:
✅ `REPORT_TXT`(人工可读美化版)和
✅ `STATUS_TXT`(机器可读纯数据版)
**确实功能重叠、冗余** —— 你的真实需求是:
> **只保留一个 `.txt` 文件,且语义明确:**
> - 若 **无 `gap > 10s`** → 写入纯文本 `"Normal"`;
> - 若 **有 `gap > 10s`** → 将 `Summary_Gaps` 中所有 `OVER_10S` 行(含列名),以 **制表符 `\t` 分隔、UTF-8 编码、无格式、可被 Excel/Python/pandas 直接 `pd.read_csv(sep='\t')` 读取** 的方式写入该 TXT。
>
> ✅ **不再需要人工排版的 `gaps_report.txt`(如带 `=====`、序号、对齐等)**
> ✅ **也不再需要额外的 `status.txt` 名称歧义** —— 统一命名为 `summary_gaps.txt`(或你指定的任意单一名字)
---
### ✅ 修改方案(精简、无冗余、完全符合你的要求)
| 项目 | 原逻辑 | 新逻辑 | 说明 |
|------|--------|--------|------|
| **TXT 文件名** | `REPORT_TXT`(美化) + `STATUS_TXT`(纯数据) | ✅ **仅一个文件:`SUMMARY_TXT = OUTPUT_XLSX.replace('.xlsx', '_gaps.txt')`** | 名称清晰:`fab2a_summary_gaps.txt` |
| **内容规则** | 两个文件各自生成 | ✅ **统一逻辑:**<br> • 无 `OVER_10S` → 写 `"Normal"`<br> • 有 `OVER_10S` → `df_over_10s.to_csv(..., sep='\t', header=True)` | 纯数据、零格式、零人工排版、可直读 |
| **删除代码** | 完整保留 `REPORT_TXT` 生成块 | ❌ **彻底删除全部 `REPORT_TXT` 相关代码(含 `f.write(...)` 美化逻辑)** | 消除冗余,减少体积与维护成本 |
---
### ✅ 最终精简版代码(仅保留 **1 个 TXT 文件**,逻辑最简)
```python
import pandas as pd
import numpy as np
import re
import os
import sys
import datetime
import logging
# ------------------------ ✅ 路径统一配置区(所有路径在此集中管理)------------------------
PATH_CONFIG = {
"INPUT_CSV": r"D:\CurveCheck\Result\History_NullValue_aaa.csv",
"OUTPUT_XLSX": r"D:\CurveCheck\Result\fab2a_summary.xlsx",
"LOG_DIR": r"D:\CurveCheck\Log", # 日志根目录(自动创建)
"SUMMARY_TXT": r"D:\CurveCheck\Result\fab2a_summary_gaps.txt", # ✅ 唯一 TXT:Normal 或 tab 分隔数据
}
# 创建必要目录
for key in ["LOG_DIR", "OUTPUT_XLSX", "SUMMARY_TXT"]:
path = PATH_CONFIG[key]
parent_dir = os.path.dirname(path) if key != "LOG_DIR" else path
os.makedirs(parent_dir, exist_ok=True)
# ------------------------ ✅ 日志初始化:按天生成 YYYYMMDD.log ------------------------
today_str = datetime.date.today().strftime("%Y%m%d")
log_file = os.path.join(PATH_CONFIG["LOG_DIR"], f"{today_str}.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_file, encoding="utf-8"),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# ------------------------ 时间解析函数(宽松模式)------------------------
def parse_datetime(s):
if pd.isna(s) or str(s).strip() == '':
return pd.NaT
s_clean = re.sub(r'\s+', ' ', str(s).strip())
try:
return pd.to_datetime(s_clean, format="%Y/%m/%d %H:%M:%S")
except ValueError:
return pd.to_datetime(s_clean, errors='coerce')
# ------------------------ 步骤1:读取 CSV ------------------------
logger.info("✅ 步骤1:读取 CSV(轻量加载)...")
try:
df = pd.read_csv(
PATH_CONFIG["INPUT_CSV"],
usecols=[0, 1, 2],
header=0,
names=['datetime_str', 'tagname', 'value'],
dtype=str,
keep_default_na=False
)
except Exception as e:
logger.error(f"❌ 读取 CSV 失败:{e}")
sys.exit(1)
for col in ['datetime_str', 'tagname', 'value']:
if col not in df.columns:
df[col] = ''
# ------------------------ 步骤2:标准化 tagname ------------------------
df['tagname_clean'] = df['tagname'].fillna('').str.strip().replace('', 'UNKNOWN')
# ------------------------ 步骤3:解析 datetime ------------------------
df['datetime'] = df['datetime_str'].apply(parse_datetime)
# ------------------------ 步骤4:NULL 间隙分析 ------------------------
logger.info("✅ 步骤2:执行 NULL 间隙分析...")
gap_records = []
def is_null_val(v):
return pd.isna(v) or v == '' or str(v).strip().upper() == 'NULL'
tags = df['tagname_clean'].unique()
for tag in tags:
mask = df['tagname_clean'] == tag
df_tag_dt = df[mask].dropna(subset=['datetime'])[['datetime', 'value']].copy()
if len(df_tag_dt) == 0:
continue
df_tag_dt = df_tag_dt.reset_index()
df_tag_dt['is_null'] = df_tag_dt['value'].apply(is_null_val)
for i in range(len(df_tag_dt)):
if not df_tag_dt.iloc[i]['is_null']:
continue
null_dt = df_tag_dt.iloc[i]['datetime']
null_orig_idx = df_tag_dt.iloc[i]['index']
next_valid_dt = None
next_orig_idx = None
for j in range(i + 1, len(df_tag_dt)):
if not df_tag_dt.iloc[j]['is_null']:
next_valid_dt = df_tag_dt.iloc[j]['datetime']
next_orig_idx = df_tag_dt.iloc[j]['index']
break
if next_valid_dt is not None:
gap_sec = (next_valid_dt - null_dt).total_seconds()
category = "OVER_10S" if gap_sec > 10 else "WITHIN_10S"
gap_records.append({
'tagname': tag,
'null_datetime': null_dt,
'next_valid_datetime': next_valid_dt,
'gap_seconds': round(gap_sec, 3),
'gap_category': category,
'null_row_index_in_original': null_orig_idx,
'next_row_index_in_original': next_orig_idx,
})
else:
gap_records.append({
'tagname': tag,
'null_datetime': null_dt,
'next_valid_datetime': pd.NaT,
'gap_seconds': np.nan,
'gap_category': np.nan,
'null_row_index_in_original': null_orig_idx,
'next_row_index_in_original': np.nan,
})
# ------------------------ 步骤5:生成 DataFrame & 写入 Excel ------------------------
df_gaps = pd.DataFrame(gap_records)
if len(df_gaps) == 0:
logger.warning("⚠️ 未检测到任何 NULL 行(或所有 datetime 解析失败)")
df_gaps = pd.DataFrame(columns=[
'tagname', 'null_datetime', 'next_valid_datetime', 'gap_seconds',
'gap_category', 'null_row_index_in_original', 'next_row_index_in_original'
])
# ✅ 写入 Excel(xlsxwriter)
try:
with pd.ExcelWriter(PATH_CONFIG["OUTPUT_XLSX"], engine='xlsxwriter') as writer:
df_gaps.to_excel(writer, sheet_name='Summary_Gaps', index=False)
logger.info(f"✅ Excel 已保存:{PATH_CONFIG['OUTPUT_XLSX']}")
except Exception as e:
logger.error(f"❌ Excel 写入失败:{e}")
sys.exit(1)
# ------------------------ ✅ 唯一 TXT:summary_gaps.txt(Normal 或 tab 数据)------------------------
df_over_10s = df_gaps[df_gaps['gap_category'] == 'OVER_10S'].copy()
df_over_10s = df_over_10s.sort_values(by=['tagname', 'null_datetime']).reset_index(drop=True)
try:
if len(df_over_10s) == 0:
with open(PATH_CONFIG["SUMMARY_TXT"], "w", encoding="utf-8") as f:
f.write("Normal")
logger.info("✅ summary_gaps.txt → Normal")
else:
df_over_10s.to_csv(
PATH_CONFIG["SUMMARY_TXT"],
sep='\t',
index=False,
header=True,
encoding='utf-8'
)
logger.info(f"✅ summary_gaps.txt → {len(df_over_10s)} 条 OVER_10S 记录(tab 分隔)")
except Exception as e:
logger.error(f"❌ summary_gaps.txt 写入失败:{e}")
# ------------------------ ✅ 统计日志 ------------------------
total = len(df_gaps)
over_10 = len(df_gaps[df_gaps['gap_category'] == 'OVER_10S'])
within_10 = len(df_gaps[df_gaps['gap_category'] == 'WITHIN_10S'])
isolated = len(df_gaps[df_gaps['gap_category'].isna()])
logger.info(f"✅ 完成!Summary_Gaps 已保存至:{PATH_CONFIG['OUTPUT_XLSX']}")
logger.info(f" ├─ 总记录数:{total}")
logger.info(f" ├─ gap > 10s:{over_10}")
logger.info(f" ├─ gap ≤ 10s:{within_10}")
logger.info(f" └─ 孤立 NULL:{isolated}")
```
---
### ✅ 输出效果示例(完全符合你的要求)
| 场景 | `fab2a_summary_gaps.txt` 内容 |
|------|-------------------------------|
| **无异常** | `Normal`(纯一行,无空格、无 BOM) |
| **有异常** | ```<br>tagname null_datetime next_valid_datetime gap_seconds gap_category null_row_index_in_original next_row_index_in_original<br>PT-101 2024-05-20 10:00:00 2024-05-20 10:00:15 15.0 OVER_10S 123 129<br>FT-205 2024-05-20 11:30:05 2024-05-20 11:30:22 17.0 OVER_10S 456 463<br>``` |
✅ 可直接用 Excel “从文本/CSV 导入” → 选择“分隔符号 → Tab”
✅ 可用 Python 一键读取:
```python
df = pd.read_csv("fab2a_summary_gaps.txt", sep='\t', encoding='utf-8')
```
---
### ✅ 为什么这是最优解?
- 🚫 **零冗余**:删掉了全部人工排版逻辑(`f.write("="*60)` 等),节省约 20 行代码 + 打包体积
- ✅ **单点真相(Single Source of Truth)**:一个 TXT = 一个语义(Normal / Data),运维/监控脚本只需检查这一个文件
- 💡 **向后兼容**:`.txt` 格式比 `.xlsx` 更易被 Shell/PLC/旧系统解析,且不依赖 Excel
- ⚡ **极致轻量**:无 `openpyxl`、无 `colorama`、无 `jinja2` —— 仅 `pandas`+`numpy`+`xlsxwriter`(≈ 3MB 总依赖)
---