<think>我们创建一个完整的Python脚本,该脚本将执行以下步骤:
1. 解析命令行参数(项目名,ref平台,dut平台)
2. 创建项目目录和result子目录
3. 检查ref和dut目录下是否有文件,没有则报错
4. 在result目录下创建Excel工作簿(项目名+日期)
5. 在ref和dut目录中查找bugreport文件(支持zip解压或直接文件)
6. 根据平台加载关键字文件
7. 从bugreport文件中提取数据(处理特殊关键字的时间差值计算)
8. 将提取的数据写入Excel的对应工作表(dut_bootimg和ref_bootimg)
9. 根据是否同一平台,选择使用公共关键字或平台关键字创建比较工作表
10. 在比较工作表中计算阶段时间和差值,并对差值大于300的单元格标红
注意:我们将处理三个特殊关键字(BOOTING_KEY, FIRST_STAGE_KEY, SECOND_STAGE_KEY)的时间差值计算。
由于代码较长,我们将分步骤实现,并添加详细注释。
以下是完整的脚本代码:
</think>
下面是一个完整的 Python 脚本解决方案,满足您所有的需求。这个脚本会处理 bugreport 文件,提取时间戳数据,计算差值,并在 Excel 中生成详细的比较报告。
```python
#!/usr/bin/env python3
import os
import sys
import argparse
import zipfile
import re
import shutil
from datetime import datetime
import openpyxl
from openpyxl.styles import PatternFill
import glob
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# 特殊关键字常量
BOOTING_KEY = "Booting Linux on physical"
FIRST_STAGE_KEY = "init first stage started!"
SECOND_STAGE_KEY = "init second stage started!"
def create_project_directories(project_name):
"""创建项目目录结构"""
project_dir = os.path.join(os.getcwd(), project_name)
result_dir = os.path.join(project_dir, 'result')
# 创建主目录和result子目录
os.makedirs(project_dir, exist_ok=True)
os.makedirs(result_dir, exist_ok=True)
logger.info(f"✅ 创建项目目录: {project_dir}")
logger.info(f"└── result")
return project_dir, result_dir
def check_directories():
"""检查ref和dut目录是否有文件"""
ref_dir = os.path.join(os.getcwd(), 'ref')
dut_dir = os.path.join(os.getcwd(), 'dut')
errors = []
# 检查ref目录
if not os.path.exists(ref_dir) or not os.path.isdir(ref_dir):
errors.append(f"❌ ref目录不存在: {ref_dir}")
elif not os.listdir(ref_dir):
errors.append(f"❌ ref目录为空: {ref_dir}")
# 检查dut目录
if not os.path.exists(dut_dir) or not os.path.isdir(dut_dir):
errors.append(f"❌ dut目录不存在: {dut_dir}")
elif not os.listdir(dut_dir):
errors.append(f"❌ dut目录为空: {dut_dir}")
if errors:
for error in errors:
logger.error(error)
sys.exit(1)
logger.info("✅ ref和dut目录检查通过")
return ref_dir, dut_dir
def create_excel_workbook(result_dir, project_name):
"""创建Excel工作簿"""
today = datetime.now().strftime("%Y%m%d")
excel_file = os.path.join(result_dir, f"{project_name}_{today}.xlsx")
wb = openpyxl.Workbook()
# 创建工作表
wb.create_sheet("comparison", 0) # 第一张表
wb.create_sheet("dut_bootimg", 1) # 第二张表
wb.create_sheet("ref_bootimg", 2) # 第三张表
# 删除默认创建的工作表
if "Sheet" in wb.sheetnames:
del wb["Sheet"]
wb.save(excel_file)
logger.info(f"📊 创建Excel工作簿: {excel_file}")
return excel_file
def find_and_extract_bugreport(directory):
"""查找并解压bugreport文件"""
# 查找bugreport开头的文件
bugreport_files = glob.glob(os.path.join(directory, 'bugreport*'))
if not bugreport_files:
logger.error(f"❌ 在 {directory} 中未找到bugreport文件")
return None
# 使用第一个找到的bugreport文件
bugreport_file = bugreport_files[0]
logger.info(f"🔍 找到bugreport文件: {bugreport_file}")
# 创建解压目录
extract_dir = os.path.join(directory, "extracted")
os.makedirs(extract_dir, exist_ok=True)
# 如果是ZIP文件则解压
if zipfile.is_zipfile(bugreport_file):
try:
with zipfile.ZipFile(bugreport_file, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
logger.info(f"📦 解压文件到: {extract_dir}")
except zipfile.BadZipFile:
logger.error(f"❌ 文件不是有效的ZIP格式: {bugreport_file}")
return None
else:
# 如果不是ZIP文件,直接复制到解压目录
shutil.copy(bugreport_file, os.path.join(extract_dir, os.path.basename(bugreport_file)))
logger.info(f"📄 复制非压缩文件到: {extract_dir}")
# 在解压目录中查找bugreport文件
extracted_files = []
for root, _, files in os.walk(extract_dir):
for file in files:
if file.lower().startswith('bugreport'):
extracted_files.append(os.path.join(root, file))
if not extracted_files:
logger.error(f"❌ 在 {extract_dir} 中未找到解压后的bugreport文件")
return None
# 返回第一个匹配的文件
return extracted_files[0]
def load_keywords(platform, script_dir):
"""加载平台特定的关键字"""
platform_map = {
"MTK": "mtk_keyword.txt",
"QCOM": "qcom_keyword.txt",
"SPRD": "sprd_keyword.txt"
}
filename = platform_map.get(platform)
if not filename:
logger.error(f"❌ 不支持的平台: {platform}")
return None
keyword_file = os.path.join(script_dir, filename)
if not os.path.exists(keyword_file):
logger.error(f"❌ 关键字文件不存在: {keyword_file}")
return None
try:
with open(keyword_file, 'r') as f:
keywords = [line.strip() for line in f.readlines() if line.strip()]
except Exception as e:
logger.error(f"❌ 读取关键字文件错误: {e}")
return None
logger.info(f"📝 从 {keyword_file} 加载 {len(keywords)} 个关键字")
return keywords
def load_common_keywords(script_dir):
"""加载公共关键字"""
keyword_file = os.path.join(script_dir, "common_keyword.txt")
if not os.path.exists(keyword_file):
logger.error(f"❌ 公共关键字文件不存在: {keyword_file}")
return None
try:
with open(keyword_file, 'r') as f:
keywords = [line.strip() for line in f.readlines() if line.strip()]
except Exception as e:
logger.error(f"❌ 读取公共关键字文件错误: {e}")
return None
logger.info(f"📝 从 {keyword_file} 加载 {len(keywords)} 个公共关键字")
return keywords
def parse_timestamp(timestamp_str, base_year=datetime.now().year):
"""解析日志时间戳字符串为datetime对象"""
try:
# 格式: "03-04 03:33:58.101"
# 将时间戳字符串分割为日期和时间部分
parts = timestamp_str.split()
if len(parts) < 2:
return None
date_part = parts[0]
time_part = parts[1]
# 处理日期格式 (月份-日期)
month, day = map(int, date_part.split('-'))
# 处理时间部分 (时:分:秒.毫秒)
time_parts = time_part.split(':')
hour = int(time_parts[0])
minute = int(time_parts[1])
# 处理秒和毫秒
sec_parts = time_parts[2].split('.')
second = int(sec_parts[0])
millisecond = int(sec_parts[1]) if len(sec_parts) > 1 else 0
# 创建datetime对象
dt = datetime(base_year, month, day, hour, minute, second, millisecond * 1000)
return dt
except Exception as e:
logger.error(f"解析时间戳错误: {timestamp_str}, 错误: {e}")
return None
def extract_timings_from_file(file_path, keywords):
"""从文件中提取关键字对应的时间值,处理特殊关键字"""
results = {}
timestamps = {}
if not os.path.exists(file_path):
logger.error(f"❌ 文件不存在: {file_path}")
return results
logger.info(f"🔎 在 {os.path.basename(file_path)} 中搜索关键字...")
try:
# 尝试多种编码打开文件
for encoding in ['utf-8', 'latin-1', 'cp1252']:
try:
with open(file_path, 'r', encoding=encoding) as f:
lines = f.readlines()
break
except UnicodeDecodeError:
continue
else:
# 所有编码都失败,使用二进制模式
with open(file_path, 'rb') as f:
lines = f.read().decode('utf-8', errors='ignore').splitlines()
except Exception as e:
logger.error(f"❌ 读取文件错误: {e}")
return results
# 收集所有关键字的时间戳
for line in lines:
for keyword in keywords:
if keyword in line:
# 提取时间戳部分(行首的前两个部分)
parts = line.strip().split()
if len(parts) < 2:
continue
timestamp_str = f"{parts[0]} {parts[1]}"
timestamp = parse_timestamp(timestamp_str)
if timestamp:
timestamps[keyword] = timestamp
logger.debug(f" 找到 {keyword} 时间戳: {timestamp_str}")
# 特殊处理:Booting关键字总是记录为0
results[BOOTING_KEY] = 0
logger.info(f" ✓ {BOOTING_KEY}: 设置为0 (特殊处理)")
# 处理每个关键字
for keyword in keywords:
if keyword == BOOTING_KEY:
# 已处理,跳过
continue
# 处理init first stage
if keyword == FIRST_STAGE_KEY:
booting_timestamp = timestamps.get(BOOTING_KEY)
first_stage_timestamp = timestamps.get(FIRST_STAGE_KEY)
if booting_timestamp and first_stage_timestamp:
# 计算与Booting的时间差(毫秒)
delta = (first_stage_timestamp - booting_timestamp).total_seconds() * 1000
results[keyword] = delta
logger.info(f" ✓ {FIRST_STAGE_KEY}: 差值计算 = {delta:.2f} ms (相对于Booting Linux)")
else:
results[keyword] = None
logger.warning(f" ⚠ {FIRST_STAGE_KEY}: 未找到或无法计算差值")
# 处理init second stage
elif keyword == SECOND_STAGE_KEY:
first_stage_timestamp = timestamps.get(FIRST_STAGE_KEY)
second_stage_timestamp = timestamps.get(SECOND_STAGE_KEY)
if first_stage_timestamp and second_stage_timestamp:
# 计算与First Stage的时间差(毫秒)
delta = (second_stage_timestamp - first_stage_timestamp).total_seconds() * 1000
results[keyword] = delta
logger.info(f" ✓ {SECOND_STAGE_KEY}: 差值计算 = {delta:.2f} ms (相对于init first stage)")
else:
results[keyword] = None
logger.warning(f" ⚠ {SECOND_STAGE_KEY}: 未找到或无法计算差值")
else:
# 普通关键字:尝试提取时间值
pattern = re.compile(rf"{re.escape(keyword)}.*?(\d+)", re.IGNORECASE)
match = None
for line in lines:
if keyword in line:
match = pattern.search(line)
if match:
break
if match:
try:
value = int(match.group(1))
results[keyword] = value
logger.info(f" ✓ {keyword}: {value} ms")
except (ValueError, IndexError):
results[keyword] = None
logger.warning(f" ⚠ {keyword}: 找到但无法解析数值")
else:
results[keyword] = None
logger.warning(f" ✗ {keyword}: 未找到")
# 确保所有关键字都有结果(包括未找到的)
for keyword in keywords:
if keyword not in results:
results[keyword] = None
return results
def write_to_excel_sheet(excel_file, sheet_name, data):
"""将数据写入Excel工作表"""
if not data:
logger.warning(f"⚠ 无数据可写入 {sheet_name} 工作表")
return
try:
wb = openpyxl.load_workbook(excel_file)
except Exception as e:
logger.error(f"❌ 加载Excel文件错误: {e}")
return
# 创建或选择工作表
if sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
sheet.delete_rows(1, sheet.max_row) # 清空现有数据
else:
sheet = wb.create_sheet(sheet_name)
# 写入表头
sheet['A1'] = "关键字"
sheet['B1'] = "时间 (ms)"
# 写入数据
row = 2
for keyword, value in data.items():
sheet.cell(row=row, column=1, value=keyword)
sheet.cell(row=row, column=2, value=value if value is not None else "N/A")
row += 1
# 调整列宽
sheet.column_dimensions['A'].width = 40
sheet.column_dimensions['B'].width = 15
try:
wb.save(excel_file)
logger.info(f"📝 数据已写入 {sheet_name} 工作表")
except Exception as e:
logger.error(f"❌ 保存Excel文件错误: {e}")
def create_comparison_sheet(excel_file, dut_data, ref_data, keywords):
"""创建比较工作表并计算差值,处理特殊关键字"""
try:
wb = openpyxl.load_workbook(excel_file)
except Exception as e:
logger.error(f"❌ 加载Excel文件错误: {e}")
return
# 创建或选择工作表
if "comparison" in wb.sheetnames:
sheet = wb["comparison"]
sheet.delete_rows(1, sheet.max_row) # 清空现有数据
else:
sheet = wb.create_sheet("comparison", 0)
# 设置表头
headers = [
"关键字",
"DUT值 (ms)",
"REF值 (ms)",
"DUT阶段时间 (ms)",
"REF阶段时间 (ms)",
"差值 (ms)"
]
for col, header in enumerate(headers, 1):
sheet.cell(row=1, column=col, value=header)
# 准备标红样式
red_fill = PatternFill(start_color="FFFF0000", end_color="FFFF0000", fill_type="solid")
# 初始化前值
prev_dut = 0
prev_ref = 0
# 处理每个关键字
for row_idx, keyword in enumerate(keywords, 2):
dut_value = dut_data.get(keyword)
ref_value = ref_data.get(keyword)
# 写入关键字和原始值
sheet.cell(row=row_idx, column=1, value=keyword)
sheet.cell(row=row_idx, column=2, value=dut_value if dut_value is not None else "N/A")
sheet.cell(row=row_idx, column=3, value=ref_value if ref_value is not None else "N/A")
# 计算阶段时间
dut_stage = None
ref_stage = None
diff = None
# 特殊处理:Booting关键字总是0
if keyword == BOOTING_KEY:
sheet.cell(row=row_idx, column=4, value=0)
sheet.cell(row=row_idx, column=5, value=0)
sheet.cell(row=row_idx, column=6, value=0)
# 更新前值
prev_dut = dut_value if dut_value is not None else 0
prev_ref = ref_value if ref_value is not None else 0
continue
# 处理init first stage
if keyword == FIRST_STAGE_KEY:
if dut_value is not None:
# 已经是与Booting的差值
dut_stage = dut_value
sheet.cell(row=row_idx, column=4, value=dut_stage)
prev_dut = dut_value # 更新为绝对时间
if ref_value is not None:
# 已经是与Booting的差值
ref_stage = ref_value
sheet.cell(row=row_idx, column=5, value=ref_stage)
prev_ref = ref_value # 更新为绝对时间
# 处理init second stage
elif keyword == SECOND_STAGE_KEY:
if dut_value is not None:
# 已经是与first stage的差值
dut_stage = dut_value
sheet.cell(row=row_idx, column=4, value=dut_stage)
prev_dut = dut_value # 更新为绝对时间
if ref_value is not None:
# 已经是与first stage的差值
ref_stage = ref_value
sheet.cell(row=row_idx, column=5, value=ref_stage)
prev_ref = ref_value # 更新为绝对时间
# 处理普通关键字
else:
if dut_value is not None:
dut_stage = dut_value - prev_dut if prev_dut is not None else None
if dut_stage is not None:
sheet.cell(row