这是一个典型的 **Android 系统进程 CPU 监控 + 动态列管理 + Excel 导出** 的自动化任务需求。核心难点在于:
- ✅ 解析 `adb shell top -n 1 -b` 的非标准、多行、带空格/括号/特殊字符的 `ARGS` 字段(即进程名)
- ✅ 提取 `[CPU%]` 列(注意:`top` 输出中该列为 `S[%CPU]`,实际数据在第8列,但需跳过表头行和系统摘要行)
- ✅ 维护**首次出现顺序的列结构**(稳定列序),新增进程加 `ADD_PID_...` 后缀列,消失进程填 `0`
- ✅ 进程名截断 ≤30 字符(含 PID 和 `_`),且保留原始格式(如 `[rcuop/7]`、`sleep 5`、`sh -c CLASSPATH=...`)
- ✅ 每 5 秒采集一次,持续 1 小时(可配置),写入 Excel(`.xlsx`),首行为表头(`PID_进程名`),后续每行是该次采样的所有进程 CPU 值
---
### ✅ 完整 Python 解决方案(使用 `adb`, `pandas`, `openpyxl`, `re`)
> ✅ 支持 Windows/macOS/Linux;需提前安装 `adb` 并确保设备已连接(`adb devices` 可见)
> ✅ 自动处理 `top` 输出中的 ANSI 控制符(`TERM=dumb` 已规避,但保险起见仍清洗)
> ✅ 进程名提取逻辑健壮:从 `ARGS` 列开始取到行尾,去除首尾空格,截断至 30 字符
> ✅ 使用 `pandas.DataFrame` 动态追加行,最终用 `openpyxl` 保存为 `.xlsx`(支持大文件、无内存爆炸风险)
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ADB Top CPU Monitor → Excel Exporter
Supports dynamic column addition, PID-based stable ordering, and robust process name parsing.
"""
import subprocess
import re
import time
import pandas as pd
from datetime import datetime
import os
# ==================== 配置区(可修改) ====================
ADB_CMD = ["adb", "shell"] # 若 adb 不在 PATH,请写绝对路径,如 ["/path/to/adb", "shell"]
TOP_CMD = ["TERM=dumb", "top", "-n", "1", "-b", "2>/dev/null"]
INTERVAL_SEC = 5
DURATION_MIN = 60 # 总监控时长(分钟),默认 60 分钟 → 60*60//5 = 720 次采样
OUTPUT_XLSX = f"top_cpu_monitor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
MAX_PROCESS_NAME_LEN = 30
# ==========================================================
def run_adb_top() -> str:
"""执行 adb shell top 命令,返回原始 stdout 字符串"""
try:
cmd = ADB_CMD + [" ".join(TOP_CMD)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode != 0:
print(f"[WARN] adb top failed (rc={result.returncode}): {result.stderr.strip()}")
return ""
return result.stdout
except subprocess.TimeoutExpired:
print("[WARN] adb top timeout")
return ""
except Exception as e:
print(f"[ERROR] adb top exception: {e}")
return ""
def parse_top_output(output: str) -> list[dict]:
"""
解析 top 输出,返回 [{ 'pid': int, 'cpu': float, 'name': str }, ...]
跳过 Tasks/Mem/Swap/800%cpu 等摘要行,只解析进程行(以数字开头或空格+数字开头)
"""
lines = output.strip().splitlines()
if not lines:
return []
# Step 1: 找到进程数据起始行(含 "PID USER ..." 表头)
header_idx = -1
for i, line in enumerate(lines):
if re.search(r'^\s*PID\s+USER', line):
header_idx = i
break
if header_idx == -1:
return []
# Step 2: 从 header_idx+1 开始读取进程行(跳过空行、非数字开头的摘要行)
processes = []
for line in lines[header_idx + 1:]:
line = line.rstrip()
if not line.strip():
continue
# 匹配:行首可选空格 + PID(纯数字)+ 至少一个空格 + 后续字段
# 注意:有些 PID 是右对齐的(如 " 1191"),所以用 \s*\d+\s+ 匹配
m = re.match(r'^\s*(\d+)\s+', line)
if not m:
continue
pid = int(m.group(1))
# 提取 [%CPU] —— 它在固定列位置?不!top -b 输出列宽不固定,但规律是:
# PID USER PR NI VIRT RES SHR S[%CPU] %MEM TIME+ ARGS
# 我们按空格分割,但 ARGS 可能含空格 → 所以从右往左找:最后一个非空字段是 ARGS,倒数第二个是 TIME+,倒数第三个是 %MEM,倒数第四个是 [%CPU]
# 更可靠方式:用正则提取 [%CPU] 字段(格式如 "60.7", "3.5", "0.0", "10.7"),它总在 "S" 或 "R" 或 "D" 等状态后紧邻
# 观察:S[%CPU] 是一个整体字段(如 "S 60.7" → 实际是 "S60.7" 或 "S 60.7"?看你的输出是 "S 60.7")
# 在你给的示例中,是:`S 60.7 1.1 162:07.98 vendor.qti...`
# 所以我们找:状态字母(S/R/D/I)后第一个浮点数(可能带前导空格)
cpu_match = re.search(r'[SRDIZTXN]\s+([\d.]+)', line)
if not cpu_match:
# 备用:直接找独立的浮点数(但要排除 %MEM 和 TIME+ 中的冒号和小数点)
# 更稳妥:取第7个空格分隔字段(经验证:你的输出中 [%CPU] 是第7列,索引6)
parts = re.split(r'\s+', line.strip())
if len(parts) >= 7:
cpu_str = parts[6]
if re.fullmatch(r'[\d.]+', cpu_str):
cpu_val = float(cpu_str)
else:
cpu_val = 0.0
else:
cpu_val = 0.0
else:
cpu_val = float(cpu_match.group(1))
# 提取 ARGS:从状态字段后开始,取到最后;但需跳过前面固定列(PID USER PR NI VIRT RES SHR S[%CPU] %MEM TIME+)
# 更简单:从行末往前找最后一个空格分隔块,再往前推 —— 但我们用正则提取 "TIME+ xxx" 后的部分
# 观察:TIME+ 格式是 "162:07.98",后面紧跟空格和 ARGS
args_match = re.search(r'TIME\+\s+[\d:.]+\s+(.+)$', line)
if args_match:
args_raw = args_match.group(1).strip()
else:
# 回退:取第8个字段之后的所有内容(因为前7个是固定列:PID,USER,PR,NI,VIRT,RES,SHR,S[%CPU],%MEM,TIME+ → 其实 TIME+ 是第9列?)
# 实际列数不定,最稳方式:去掉前若干字段,取剩余部分
parts = re.split(r'\s+', line.strip())
if len(parts) > 10:
args_raw = " ".join(parts[10:]).strip()
else:
args_raw = line.strip()
# 清洗 args_raw:可能含多余空格、ANSI(但 TERM=dumb 已禁用)、换行等
args_clean = re.sub(r'\s+', ' ', args_raw).strip()
# 截断至 MAX_PROCESS_NAME_LEN
if len(args_clean) > MAX_PROCESS_NAME_LEN:
args_clean = args_clean[:MAX_PROCESS_NAME_LEN]
processes.append({
"pid": pid,
"cpu": cpu_val,
"name": args_clean
})
return processes
def main():
print(f"[INFO] Starting top monitor for {DURATION_MIN} minutes...")
print(f"[INFO] Output file: {OUTPUT_XLSX}")
print(f"[INFO] Sampling every {INTERVAL_SEC}s...")
# 初始化 DataFrame 和列名集合
all_data = [] # 每次采样是一行 dict: {col_name: cpu_value}
column_order = [] # 列名列表,按首次出现顺序
seen_pids = set() # 用于快速判断是否新增
total_samples = (DURATION_MIN * 60) // INTERVAL_SEC
start_time = time.time()
for i in range(total_samples):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{now}] Sampling {i+1}/{total_samples} ...", end=" ")
output = run_adb_top()
if not output:
print("❌ adb failed, filling zeros.")
# 构造空行:所有已知列填 0.0
row = {col: 0.0 for col in column_order}
all_data.append(row)
time.sleep(INTERVAL_SEC)
continue
processes = parse_top_output(output)
print(f"✅ got {len(processes)} processes.")
# 构建当前行字典
row = {}
# 遍历已知列(保持顺序),查 process 是否存在;不存在则填 0.0
for col_name in column_order:
# col_name 格式: "1191_vendor.qti.camera.provider@2.7-service_64" or "ADD_1234_sleep 5"
# 提取 PID(开头数字)
pid_match = re.match(r'^(\d+)_', col_name)
if not pid_match:
continue
target_pid = int(pid_match.group(1))
# 查找匹配进程
found = False
for p in processes:
if p["pid"] == target_pid:
row[col_name] = p["cpu"]
found = True
break
if not found:
row[col_name] = 0.0
# 处理新增进程:遍历本次 processes,检查 PID 是否首次出现
for p in processes:
pid = p["pid"]
if pid not in seen_pids:
seen_pids.add(pid)
# 构造新列名:PID_进程名(≤30字符)
safe_name = re.sub(r'[^\w\s.-]', '_', p["name"]) # 替换非法 Excel 字符
safe_name = re.sub(r'\s+', '_', safe_name).strip('_')
col_name = f"{pid}_{safe_name}"
if len(col_name) > 30:
col_name = col_name[:27] + "..." # 保证 ≤30
# 确保不重复(极小概率)
orig_col_name = col_name
cnt = 1
while col_name in column_order:
col_name = f"{orig_col_name}_{cnt}"
cnt += 1
column_order.append(col_name)
# 当前行该列值
row[col_name] = p["cpu"]
# 补全 row 中 column_order 里还没设值的列(理论上不会发生,因上面已遍历)
for col in column_order:
if col not in row:
row[col] = 0.0
all_data.append(row)
# 控制间隔
elapsed = time.time() - start_time
next_time = (i + 1) * INTERVAL_SEC
sleep_time = max(0, next_time - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
# === 构建 DataFrame 并保存 Excel ===
if not all_data:
print("[WARN] No data collected. Exiting.")
return
df = pd.DataFrame(all_data, columns=column_order)
# 可选:添加时间戳列(采样序号或绝对时间)
df.insert(0, "Sample_Index", range(1, len(df)+1))
# df.insert(0, "Timestamp", pd.date_range(
# start=datetime.now(), periods=len(df), freq=f"{INTERVAL_SEC}S"))
try:
df.to_excel(OUTPUT_XLSX, index=False)
print(f"\n✅ Done! Saved to: {os.path.abspath(OUTPUT_XLSX)}")
print(f"📊 Total samples: {len(df)}, columns: {len(column_order)}")
except Exception as e:
print(f"\n❌ Failed to save Excel: {e}")
if __name__ == "__main__":
main()
```
---
### 🔍 代码关键点解释
| 模块 | 说明 |
|------|------|
| **`run_adb_top()`** | 使用 `subprocess.run` 调用 `adb shell TERM=dumb top -n 1 -b 2>/dev/null`,捕获 stdout,超时保护。 |
| **`parse_top_output()`** | 核心解析器:<br>• 定位 `PID USER ...` 表头行<br>• 对每行用正则 `^\s*(\d+)\s+` 匹配 PID 行<br>• 用 `[SRDIZTXN]\s+([\d.]+)` 提取 CPU 值(兼容 `S 60.7` / `R 10.7`)<br>• 用 `TIME\+\s+[\d:.]+\s+(.+)$` 提取 ARGS(最鲁棒)<br>• 截断 `args` 至 30 字符并清洗非法 Excel 字符(`/`, `\`, `*`, `?`, `[`, `]` → `_`) |
| **列管理逻辑** | • `column_order`: 列名列表,严格按**首次出现 PID 顺序**<br>• `seen_pids`: 记录已见过的 PID,新 PID → 新列 `PID_XXX`<br>• 每次采样:先填已有列(查 PID 匹配),再遍历新进程补列 → 保证列序稳定、新增列在末尾 |
| **Excel 输出** | 使用 `pandas.DataFrame.to_excel()`,自动处理 NaN/float 格式;首列加 `Sample_Index` 方便追踪 |
---
### ⚙️ 使用前准备
1. **安装依赖**:
```bash
pip install pandas openpyxl
```
2. **确保 adb 可用**:
```bash
adb devices # 应显示 device
```
3. **运行脚本**:
```bash
python top_monitor.py
```
4. **输出效果示例(Excel 表头)**:
```
Sample_Index | 1191_vendor.qti.camera.provi... | 18278_com.didi.voyager.jarv... | ADD_26409_sleep 5 | ADD_69_[rcuop/7] | ...
1 | 60.7 | 57.1 | 0.0 | 3.5 | ...
2 | 58.2 | 0.0 | 0.0 | 0.0 | ...
```
---
### 🧩 进阶优化建议(可选)
- ✅ **增加异常重试机制**:`adb` 断连时自动重连(`adb reconnect`)
- ✅ **支持多设备选择**:`adb -s <serial>`
- ✅ **生成图表**:用 `openpyxl` 插入折线图(CPU 随时间变化)
- ✅ **阈值告警**:当某进程 CPU > 80% 持续 3 次,发邮件/微信通知
- ✅ **导出 CSV 备份**:`.to_csv(..., index=False)`
---