你当前的 Python 脚本用于分析 CSV 中的 `NULL` 间隙(特别是时间序列中 `value` 为 NULL 的行,并计算其与下一个非 NULL 行之间的时间差),并输出 Excel 汇总表 + 可读 TXT 报告。现在你想**让该脚本每天自动运行多次(如每 10 分钟一次)**,并**将所有当日运行的日志统一写入一个以日期命名的日志文件(如 `2025-04-05.log`)**,便于用户每日集中查看运行状态、成功/失败信息、统计摘要等。
---
### ✅ 解决方案:添加「按天滚动的日志系统」
我们将:
- 使用标准库 `logging` 模块;
- 动态生成日志文件路径:`logs/YYYY-MM-DD.log`;
- 每次运行时自动追加日志(不覆盖);
- 记录关键节点:开始时间、步骤耗时、异常、统计结果;
- 避免重复导入/配置冲突(支持多次调用);
- **兼容 Windows 路径(你用的是 `D:\\...`)**;
- 日志级别设为 `INFO`(含详细信息),错误用 `ERROR`;
- 同时保留控制台输出(方便调试),但日志文件更规范、可归档。
---
### ✅ 修改后的完整代码(仅展示新增/修改部分,其余保持原逻辑不变)
> 🔔 **说明**:以下代码是对你原始脚本的**增量增强版**,你只需将下方 `# —— 新增:日志配置 ——` 开始的部分插入到你脚本最顶部(`import` 之后、`INPUT_CSV` 定义之前),并在关键执行点插入 `logger.info()` / `logger.error()` 即可。
```python
import pandas as pd
import numpy as np
import re
import logging
import os
from datetime import datetime
# —— 新增:日志配置 ——
LOG_DIR = "D:\\Class\\曲线导出检测\\logs" # ✅ 自定义日志目录(可改)
os.makedirs(LOG_DIR, exist_ok=True) # 确保目录存在
# 获取今日日期字符串,用于日志文件名
TODAY_STR = datetime.now().strftime("%Y-%m-%d")
LOG_FILE = os.path.join(LOG_DIR, f"{TODAY_STR}.log")
# 配置 logger(全局唯一,避免重复添加 handler)
logger = logging.getLogger("NullGapAnalyzer")
logger.setLevel(logging.INFO)
# 防止重复添加 handler(重要!否则每次运行日志会翻倍)
if not logger.handlers:
# 控制台输出(方便实时看)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 文件输出(按天)
file_handler = logging.FileHandler(LOG_FILE, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 统一日志格式
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)-6s | %(message)s",
datefmt="%H:%M:%S"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# ------------------------ 配置区 ------------------------
INPUT_CSV = "D:\\Class\\曲线导出检测\\History_NullValue_aaa.csv"
OUTPUT_XLSX = "D:\\Class\\曲线导出检测\\fab2a_summary.xlsx"
# 时间解析函数(宽松模式)→ 不变
def parse_datetime(s):
if pd.isna(s) or str(s).strip() == '':
return pd.NaT
s_clean = re.sub(r'\s+', ' ', str(s).strip())
try:
return pd.to_datetime(s_clean, format="%Y/%m/%d %H:%M:%S")
except ValueError:
return pd.to_datetime(s_clean, errors='coerce')
# ------------------------ 步骤1:读取 CSV ------------------------
logger.info(f"🚀 开始执行分析任务 | 输入文件:{os.path.basename(INPUT_CSV)}")
try:
print("✅ 步骤1:读取 CSV(轻量加载)...")
logger.info("步骤1:开始读取 CSV...")
df = pd.read_csv(INPUT_CSV, usecols=[0, 1, 2], header=0,
names=['datetime_str', 'tagname', 'value'],
dtype=str, keep_default_na=False)
for col in ['datetime_str', 'tagname', 'value']:
if col not in df.columns:
df[col] = ''
logger.info(f"步骤1:成功读取 {len(df)} 行数据")
# ------------------------ 步骤2:标准化 tagname ------------------------
df['tagname_clean'] = df['tagname'].fillna('').str.strip().replace('', 'UNKNOWN')
logger.info("步骤2:tagname 标准化完成")
# ------------------------ 步骤3:解析 datetime ------------------------
logger.info("步骤3:开始解析 datetime 字段...")
df['datetime'] = df['datetime_str'].apply(parse_datetime)
valid_dt_count = df['datetime'].notna().sum()
logger.info(f"步骤3:共解析 {valid_dt_count}/{len(df)} 行有效时间")
# ------------------------ 步骤4:NULL 间隙分析 ------------------------
logger.info("步骤4:开始 NULL 间隙分析...")
gap_records = []
def is_null_val(v):
if pd.isna(v) or v == '' or str(v).strip().upper() == 'NULL':
return True
return False
tags = df['tagname_clean'].unique()
for tag in tags:
mask = df['tagname_clean'] == tag
df_tag_dt = df[mask].dropna(subset=['datetime'])[['datetime', 'value']].copy()
if len(df_tag_dt) == 0:
continue
df_tag_dt = df_tag_dt.reset_index()
df_tag_dt['is_null'] = df_tag_dt['value'].apply(is_null_val)
for i in range(len(df_tag_dt)):
if not df_tag_dt.iloc[i]['is_null']:
continue
null_dt = df_tag_dt.iloc[i]['datetime']
null_orig_idx = df_tag_dt.iloc[i]['index']
next_valid_dt = None
next_orig_idx = None
for j in range(i + 1, len(df_tag_dt)):
if not df_tag_dt.iloc[j]['is_null']:
next_valid_dt = df_tag_dt.iloc[j]['datetime']
next_orig_idx = df_tag_dt.iloc[j]['index']
break
if next_valid_dt is not None:
gap_sec = (next_valid_dt - null_dt).total_seconds()
category = "OVER_10S" if gap_sec > 10 else "WITHIN_10S"
gap_records.append({
'tagname': tag,
'null_datetime': null_dt,
'next_valid_datetime': next_valid_dt,
'gap_seconds': round(gap_sec, 3),
'gap_category': category,
'null_row_index_in_original': null_orig_idx,
'next_row_index_in_original': next_orig_idx,
})
else:
gap_records.append({
'tagname': tag,
'null_datetime': null_dt,
'next_valid_datetime': pd.NaT,
'gap_seconds': np.nan,
'gap_category': np.nan,
'null_row_index_in_original': null_orig_idx,
'next_row_index_in_original': np.nan,
})
df_gaps = pd.DataFrame(gap_records)
if len(df_gaps) == 0:
logger.warning("⚠️ 未检测到任何 NULL 行(或所有 datetime 解析失败)")
df_gaps = pd.DataFrame(columns=[
'tagname', 'null_datetime', 'next_valid_datetime', 'gap_seconds',
'gap_category', 'null_row_index_in_original', 'next_row_index_in_original'
])
# ------------------------ 步骤5:写入 Excel ------------------------
logger.info("步骤5:写入 Excel 汇总表...")
with pd.ExcelWriter(OUTPUT_XLSX, engine='openpyxl') as writer:
df_gaps.to_excel(writer, sheet_name='Summary_Gaps', index=False)
logger.info(f"✅ Excel 已保存至:{os.path.basename(OUTPUT_XLSX)}")
# ------------------------ 步骤6:生成 TXT 报告 ------------------------
REPORT_TXT = OUTPUT_XLSX.replace('.xlsx', '_gaps_report.txt')
logger.info(f"步骤6:生成 TXT 报告 → {os.path.basename(REPORT_TXT)}")
df_over_10s = df_gaps[df_gaps['gap_category'] == 'OVER_10S'].copy()
df_over_10s = df_over_10s.sort_values(by=['tagname', 'null_datetime']).reset_index(drop=True)
with open(REPORT_TXT, 'w', encoding='utf-8') as f:
if len(df_over_10s) == 0:
f.write("无异常情况\n")
else:
f.write("异常曲线:附上大于10s的数据\n")
f.write("=" * 60 + "\n")
f.write(f"{'序号':<4} {'Tag名':<12} {'NULL时间':<22} {'下一有效时间':<22} {'间隔(秒)':<10}\n")
f.write("-" * 60 + "\n")
for idx, row in df_over_10s.iterrows():
null_time_str = row['null_datetime'].strftime("%Y/%m/%d %H:%M:%S") if pd.notna(row['null_datetime']) else "N/A"
next_time_str = row['next_valid_datetime'].strftime("%Y/%m/%d %H:%M:%S") if pd.notna(row['next_valid_datetime']) else "N/A"
gap_sec = row['gap_seconds'] if pd.notna(row['gap_seconds']) else "N/A"
f.write(f"{idx+1:<4} {row['tagname']:<12} {null_time_str:<22} {next_time_str:<22} {gap_sec:<10}\n")
f.write("-" * 60 + "\n")
f.write(f"总计发现 {len(df_over_10s)} 处 gap > 10 秒的异常点。\n")
logger.info(f"✅ TXT 报告已保存:{os.path.basename(REPORT_TXT)}")
# 📊 最终统计摘要(写入日志 + 控制台)
total_gaps = len(df_gaps)
over_10s = len(df_gaps[df_gaps['gap_category'] == 'OVER_10S'])
within_10s = len(df_gaps[df_gaps['gap_category'] == 'WITHIN_10S'])
isolated = len(df_gaps[df_gaps['gap_category'].isna()])
summary = (
f"📊 本次运行统计:"
f"总 NULL 间隙={total_gaps} | "
f">10s={over_10s} | "
f"≤10s={within_10s} | "
f"孤立 NULL={isolated}"
)
logger.info(summary)
print(f"✅ 完成!Summary_Gaps 已保存至:{OUTPUT_XLSX}")
print(f" ├─ {summary}")
except Exception as e:
error_msg = f"❌ 运行失败:{str(e)}"
logger.error(error_msg)
print(error_msg)
finally:
logger.info("🔚 当前轮次执行结束\n")
```
---
### ✅ 日志效果示例(`D:\Class\曲线导出检测\logs\2025-04-05.log` 内容节选):
```log
[10:23:15] INFO | 🚀 开始执行分析任务 | 输入文件:History_NullValue_aaa.csv
[10:23:15] INFO | 步骤1:开始读取 CSV...
[10:23:16] INFO | 步骤1:成功读取 12480 行数据
[10:23:16] INFO | 步骤2:tagname 标准化完成
[10:23:16] INFO | 步骤3:开始解析 datetime 字段...
[10:23:17] INFO | 步骤3:共解析 12475/12480 行有效时间
[10:23:19] INFO | 步骤4:开始 NULL 间隙分析...
[10:23:22] INFO | 步骤5:写入 Excel 汇总表...
[10:23:22] INFO | ✅ Excel 已保存至:fab2a_summary.xlsx
[10:23:22] INFO | 步骤6:生成 TXT 报告 → fab2a_summary_gaps_report.txt
[10:23:22] INFO | ✅ TXT 报告已保存:fab2a_summary_gaps_report.txt
[10:23:22] INFO | 📊 本次运行统计:总 NULL 间隙=32 | >10s=5 | ≤10s=25 | 孤立 NULL=2
[10:23:22] INFO | 🔚 当前轮次执行结束
[10:33:15] INFO | 🚀 开始执行分析任务 | 输入文件:History_NullValue_aaa.csv
[10:33:15] INFO | 步骤1:开始读取 CSV...
...
```
✅ 每次运行都会追加到当天日志,**不会覆盖历史记录**;
✅ 用户只需每天打开 `2025-04-05.log` 就能看到全天所有运行快照;
✅ 错误会高亮 `ERROR`,便于排查;
✅ 支持多进程/定时任务并发(`logging` 是线程安全的)。
---
### ✅ 如何实现“每 10 分钟运行一次”?(补充建议)
你可用以下任一方式调度(Windows 推荐):
| 方式 | 说明 |
|------|------|
| **Windows 任务计划程序** | 创建基本任务 → 触发器设为“每 10 分钟”,操作为“启动程序” → `python.exe D:\path\to\your_script.py` |
| **Python `schedule` + 后台服务** | 用 `schedule` 库循环 `every(10).minutes.do(main)`,配合 `pythonw.exe` 静默运行(需额外封装) |
| **第三方工具(如 Cronitor、Healthchecks)** | 更适合生产级监控+告警 |
> 💡 提示:若使用任务计划程序,请确保 Python 环境变量已配置,或在操作中指定完整路径:
> `D:\Python39\python.exe`
> 参数:`D:\Class\曲线导出检测\gap_analyzer.py`
---