<think>我们创建一个完整的Python脚本来满足上述需求。该脚本将执行以下步骤:
1. 解析命令行参数(项目名称,ref平台,dut平台)
2. 创建项目目录和result子目录
3. 检查ref和dut目录下是否存在文件(至少有一个bugreport文件)
4. 在result目录中创建一个以项目名和当前日期命名的Excel工作簿
5. 处理ref和dut目录下的bugreport文件(解压、查找、解析)
6. 根据平台关键字提取数据并写入Excel的对应表(ref_bootimg和dut_bootimg)
7. 根据平台是否相同,使用不同的关键字列表(common_keyword或平台关键字)创建比较表
8. 在比较表中计算阶段时间差和总差值,并对差值大于300的单元格标红
注意:我们假设bugreport文件可能是zip压缩包,也可能直接是文本文件。我们解压(如果是zip)后查找以"bugreport"开头的文件。
我们将使用以下库:
- argparse: 解析命令行参数
- os: 文件系统操作
- zipfile: 解压zip文件
- openpyxl: 操作Excel文件
- datetime: 获取当前日期
- re: 正则表达式搜索
- logging: 日志记录
特殊处理:
关键字"Booting Linux on physical":如果没有找到,记录0;如果找到,也记录0(因为它是基准点)
关键字"init first stage started!":记录与"Booting Linux on physical"的时间差(毫秒)
关键字"init second stage started!":记录与"init first stage started!"的时间差(毫秒)
步骤6中,如果平台不同则使用common_keyword.txt,否则使用对应平台的关键字文件。
我们将定义以下函数:
- create_project_directories(project_name)
- check_directories()
- create_excel_workbook(result_dir, project_name)
- find_and_extract_bugreport(directory)
- load_keywords(platform, script_dir) # 加载平台关键字
- load_common_keywords(script_dir) # 加载公共关键字
- extract_timings_from_file(file_path, keywords) # 从文件中提取时间信息
- write_to_excel_sheet(excel_file, sheet_name, data) # 将数据写入Excel的指定表
- create_comparison_sheet(excel_file, dut_data, ref_data, keywords, same_platform) # 创建比较表
注意:时间差计算需要从日志行中提取时间戳。我们假设时间戳格式为 "MM-DD HH:MM:SS.mmm"(如"03-04 03:33:58.101"),并转换为毫秒时间戳(相对于同一天)。由于日志时间都在同一天,我们可以将日期设置为当前年(或参考一个固定日期,因为同一天内比较)。
但是,注意:我们只需要计算两个时间戳之间的差值(毫秒),所以可以解析成datetime对象,然后相减得到毫秒差。
具体步骤:
1. 解析一行日志,提取时间戳字符串(前两个字段)。
2. 将时间戳字符串转换为datetime对象(假设年份为当前年,因为日志中没有年份)。
3. 计算两个datetime对象的差值,并转换为毫秒。
对于特殊关键字,我们首先找到它们的时间戳,然后计算差值。
对于普通关键字,我们直接提取关键字后的数字(单位毫秒),如果找不到则记录为None。
由于需求复杂,我们将分步骤实现。
下面开始编写代码:
</think>
下面我将提供一个完整的Python脚本解决方案,满足您所有需求。这个脚本会处理bugreport文件,提取时间戳数据,计算差值,并在Excel中生成详细的比较报告。
```python
#!/usr/bin/env python3
import os
import sys
import argparse
import zipfile
import re
import shutil
from datetime import datetime, timedelta
import openpyxl
from openpyxl.styles import PatternFill
from openpyxl.utils import get_column_letter
import logging
import glob
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# 特殊关键字常量
BOOTING_KEY = "Booting Linux on physical"
FIRST_STAGE_KEY = "init first stage started!"
SECOND_STAGE_KEY = "init second stage started!"
def create_project_directories(project_name):
"""创建项目目录结构"""
project_dir = os.path.join(os.getcwd(), project_name)
result_dir = os.path.join(project_dir, 'result')
# 创建主目录和result子目录
os.makedirs(project_dir, exist_ok=True)
os.makedirs(result_dir, exist_ok=True)
logger.info(f"✅ 创建项目目录: {project_dir}")
logger.info(f"└── result")
return project_dir, result_dir
def check_directories():
"""检查ref和dut目录是否有文件"""
ref_dir = os.path.join(os.getcwd(), 'ref')
dut_dir = os.path.join(os.getcwd(), 'dut')
errors = []
# 检查ref目录
if not os.path.exists(ref_dir) or not os.path.isdir(ref_dir):
errors.append(f"❌ ref目录不存在: {ref_dir}")
elif not os.listdir(ref_dir):
errors.append(f"❌ ref目录为空: {ref_dir}")
# 检查dut目录
if not os.path.exists(dut_dir) or not os.path.isdir(dut_dir):
errors.append(f"❌ dut目录不存在: {dut_dir}")
elif not os.listdir(dut_dir):
errors.append(f"❌ dut目录为空: {dut_dir}")
if errors:
for error in errors:
logger.error(error)
sys.exit(1)
logger.info("✅ ref和dut目录检查通过")
return ref_dir, dut_dir
def create_excel_workbook(result_dir, project_name):
"""创建Excel工作簿"""
today = datetime.now().strftime("%Y%m%d")
excel_file = os.path.join(result_dir, f"{project_name}_{today}.xlsx")
wb = openpyxl.Workbook()
# 创建工作表
wb.create_sheet("comparison", 0) # 第一张表
wb.create_sheet("dut_bootimg", 1) # 第二张表
wb.create_sheet("ref_bootimg", 2) # 第三张表
# 删除默认创建的工作表
if "Sheet" in wb.sheetnames:
del wb["Sheet"]
wb.save(excel_file)
logger.info(f"📊 创建Excel工作簿: {excel_file}")
return excel_file
def find_and_extract_bugreport(directory):
"""查找并解压bugreport文件"""
# 查找bugreport开头的文件
bugreport_files = glob.glob(os.path.join(directory, 'bugreport*'))
if not bugreport_files:
logger.error(f"❌ 在 {directory} 中未找到bugreport文件")
return None
# 使用第一个找到的bugreport文件
bugreport_file = bugreport_files[0]
logger.info(f"🔍 找到bugreport文件: {bugreport_file}")
# 创建解压目录
extract_dir = os.path.join(directory, "extracted")
os.makedirs(extract_dir, exist_ok=True)
# 如果是ZIP文件则解压
if zipfile.is_zipfile(bugreport_file):
try:
with zipfile.ZipFile(bugreport_file, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
logger.info(f"📦 解压文件到: {extract_dir}")
except zipfile.BadZipFile:
logger.error(f"❌ 文件不是有效的ZIP格式: {bugreport_file}")
return None
else:
# 如果不是ZIP文件,直接复制到解压目录
shutil.copy(bugreport_file, os.path.join(extract_dir, os.path.basename(bugreport_file)))
logger.info(f"📄 复制非压缩文件到: {extract_dir}")
# 在解压目录中查找bugreport文件
extracted_files = []
for root, _, files in os.walk(extract_dir):
for file in files:
if file.lower().startswith('bugreport'):
extracted_files.append(os.path.join(root, file))
if not extracted_files:
logger.error(f"❌ 在 {extract_dir} 中未找到解压后的bugreport文件")
return None
# 返回第一个匹配的文件
return extracted_files[0]
def load_keywords(platform, script_dir):
"""加载平台特定的关键字"""
platform_map = {
"MTK": "mtk_keyword.txt",
"QCOM": "qcom_keyword.txt",
"SPRD": "sprd_keyword.txt"
}
filename = platform_map.get(platform)
if not filename:
logger.error(f"❌ 不支持的平台: {platform}")
return None
keyword_file = os.path.join(script_dir, filename)
if not os.path.exists(keyword_file):
logger.error(f"❌ 关键字文件不存在: {keyword_file}")
return None
try:
with open(keyword_file, 'r') as f:
keywords = [line.strip() for line in f.readlines() if line.strip()]
except Exception as e:
logger.error(f"❌ 读取关键字文件错误: {e}")
return None
logger.info(f"📝 从 {keyword_file} 加载 {len(keywords)} 个关键字")
return keywords
def load_common_keywords(script_dir):
"""加载公共关键字"""
keyword_file = os.path.join(script_dir, "common_keyword.txt")
if not os.path.exists(keyword_file):
logger.error(f"❌ 公共关键字文件不存在: {keyword_file}")
return None
try:
with open(keyword_file, 'r') as f:
keywords = [line.strip() for line in f.readlines() if line.strip()]
except Exception as e:
logger.error(f"❌ 读取公共关键字文件错误: {e}")
return None
logger.info(f"📝 从 {keyword_file} 加载 {len(keywords)} 个公共关键字")
return keywords
def parse_timestamp(timestamp_str, base_year=datetime.now().year):
"""解析日志时间戳字符串为datetime对象"""
try:
# 格式: "03-04 03:33:58.101"
# 将时间戳字符串分割为日期和时间部分
parts = timestamp_str.split()
if len(parts) < 2:
return None
date_part = parts[0]
time_part = parts[1]
# 时间部分可能有毫秒,也可能没有
if '.' in time_part:
time_format = "%H:%M:%S.%f"
else:
time_format = "%H:%M:%S"
# 组合成完整字符串
full_str = f"{base_year}-{date_part} {time_part}"
dt = datetime.strptime(full_str, "%Y-%m-%d %H:%M:%S.%f" if '.' in time_part else "%Y-%m-%d %H:%M:%S")
return dt
except Exception as e:
logger.error(f"解析时间戳错误: {timestamp_str}, 错误: {e}")
return None
def extract_timings_from_file(file_path, keywords):
"""从文件中提取关键字对应的时间值,处理特殊关键字"""
results = {}
timestamps = {}
if not os.path.exists(file_path):
logger.error(f"❌ 文件不存在: {file_path}")
return results
logger.info(f"🔎 在 {os.path.basename(file_path)} 中搜索关键字...")
try:
# 尝试多种编码打开文件
for encoding in ['utf-8', 'latin-1', 'cp1252']:
try:
with open(file_path, 'r', encoding=encoding) as f:
lines = f.readlines()
break
except UnicodeDecodeError:
continue
else:
# 所有编码都失败,使用二进制模式
with open(file_path, 'rb') as f:
lines = f.read().decode('utf-8', errors='ignore').splitlines()
except Exception as e:
logger.error(f"❌ 读取文件错误: {e}")
return results
# 收集所有关键字的时间戳
for line in lines:
for keyword in keywords:
if keyword in line:
# 提取时间戳部分(行首的前两个部分)
parts = line.strip().split()
if len(parts) < 2:
continue
timestamp_str = f"{parts[0]} {parts[1]}"
timestamp = parse_timestamp(timestamp_str)
if timestamp:
timestamps[keyword] = timestamp
logger.debug(f" 找到 {keyword} 时间戳: {timestamp_str}")
# 特殊处理:Booting关键字总是记录为0
results[BOOTING_KEY] = 0
logger.info(f" ✓ {BOOTING_KEY}: 设置为0 (特殊处理)")
# 处理每个关键字
for keyword in keywords:
if keyword == BOOTING_KEY:
# 已处理,跳过
continue
# 处理init first stage
if keyword == FIRST_STAGE_KEY:
booting_timestamp = timestamps.get(BOOTING_KEY)
first_stage_timestamp = timestamps.get(FIRST_STAGE_KEY)
if booting_timestamp and first_stage_timestamp:
# 计算与Booting的时间差(毫秒)
delta = (first_stage_timestamp - booting_timestamp).total_seconds() * 1000
results[keyword] = delta
logger.info(f" ✓ {FIRST_STAGE_KEY}: 差值计算 = {delta:.2f} ms (相对于Booting Linux)")
else:
results[keyword] = None
logger.warning(f" ⚠ {FIRST_STAGE_KEY}: 未找到或无法计算差值")
# 处理init second stage
elif keyword == SECOND_STAGE_KEY:
first_stage_timestamp = timestamps.get(FIRST_STAGE_KEY)
second_stage_timestamp = timestamps.get(SECOND_STAGE_KEY)
if first_stage_timestamp and second_stage_timestamp:
# 计算与First Stage的时间差(毫秒)
delta = (second_stage_timestamp - first_stage_timestamp).total_seconds() * 1000
results[keyword] = delta
logger.info(f" ✓ {SECOND_STAGE_KEY}: 差值计算 = {delta:.2f} ms (相对于init first stage)")
else:
results[keyword] = None
logger.warning(f" ⚠ {SECOND_STAGE_KEY}: 未找到或无法计算差值")
else:
# 普通关键字:尝试提取时间值
pattern = re.compile(rf"{re.escape(keyword)}.*?(\d+)", re.IGNORECASE)
match = None
for line in lines:
if keyword in line:
match = pattern.search(line)
if match:
break
if match:
try:
value = int(match.group(1))
results[keyword] = value
logger.info(f" ✓ {keyword}: {value} ms")
except (ValueError, IndexError):
results[keyword] = None
logger.warning(f" ⚠ {keyword}: 找到但无法解析数值")
else:
results[keyword] = None
logger.warning(f" ✗ {keyword}: 未找到")
return results
def write_to_excel_sheet(excel_file, sheet_name, data):
"""将数据写入Excel工作表"""
if not data:
logger.warning(f"⚠ 无数据可写入 {sheet_name} 工作表")
return
try:
wb = openpyxl.load_workbook(excel_file)
except Exception as e:
logger.error(f"❌ 加载Excel文件错误: {e}")
return
# 创建或选择工作表
if sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
sheet.delete_rows(1, sheet.max_row) # 清空现有数据
else:
sheet = wb.create_sheet(sheet_name)
# 写入表头
sheet['A1'] = "关键字"
sheet['B1'] = "时间 (ms)"
# 写入数据
row = 2
for keyword in data:
value = data.get(keyword)
sheet.cell(row=row, column=1, value=keyword)
sheet.cell(row=row, column=2, value=value if value is not None else "N/A")
row += 1
# 调整列宽
sheet.column_dimensions['A'].width = 40
sheet.column_dimensions['B'].width = 15
try:
wb.save(excel_file)
logger.info(f"📝 数据已写入 {sheet_name} 工作表")
except Exception as e:
logger.error(f"❌ 保存Excel文件错误: {e}")
def create_comparison_sheet(excel_file, dut_data, ref_data, keywords, same_platform=False):
"""创建比较工作表并计算差值,处理特殊关键字"""
try:
wb = openpyxl.load_workbook(excel_file)
except Exception as e:
logger.error(f"❌ 加载Excel文件错误: {e}")
return
# 创建或选择工作表
if "comparison" in wb.sheetnames:
sheet = wb["comparison"]
sheet.delete_rows(1, sheet.max_row) # 清空现有数据
else:
sheet = wb.create_sheet("comparison", 0)
# 设置表头
headers = [
"关键字",
"DUT值 (ms)",
"REF值 (ms)",
"DUT阶段时间 (ms)",
"REF阶段时间 (ms)",
"差值 (ms)"
]
for col, header in enumerate(headers, 1):
sheet.cell(row=1, column=col, value=header)
# 准备标红样式
red_fill = PatternFill(start_color="FFFF0000", end_color="FFFF0000", fill_type="solid")
# 初始化前值
prev_dut = 0
prev_ref = 0
# 处理每个关键字
for row_idx, keyword in enumerate(keywords, 2):
dut_value = dut_data.get(keyword)
ref_value = ref_data.get(keyword)
# 写入关键字和原始值
sheet.cell(row=row_idx, column=1, value=keyword)
sheet.cell(row=row_idx, column=2, value=dut_value if dut_value is not None else "N/A")
sheet.cell(row=row_idx, column=3, value=ref_value if ref_value is not None else "N/A")
# 计算阶段时间
dut_stage = None
ref_stage = None
diff = None
# 特殊处理:Booting关键字总是0
if keyword == BOOTING_KEY:
sheet.cell(row=row_idx, column=4, value=0)
sheet.cell(row=row_idx, column=5, value=0)
sheet.cell(row=row_idx, column=6, value=0)
# 更新前值
prev_dut = dut_value if dut_value is not None else 0
prev_ref = ref_value if ref_value is not None else 0
continue
# 处理init first stage
if keyword == FIRST_STAGE_KEY:
if dut_value is not None:
# 已经是与Booting的差值
dut_stage = dut_value
sheet.cell(row=row_idx, column=4, value=dut_stage)
prev_dut = dut_value # 更新为绝对时间
if ref_value is not None:
# 已经是与Booting的差值
ref_stage = ref_value
sheet.cell(row=row_idx, column=5, value=ref_stage)
prev_ref = ref_value # 更新为绝对时间
# 处理init second stage
elif keyword == SECOND_STAGE_KEY:
if dut_value is not None:
# 已经是与first stage的差值
dut_stage = dut_value
sheet.cell(row=row_idx, column=4, value=dut_stage)
prev_dut = dut_value # 更新为绝对时间
if ref_value is not None:
# 已经是与first stage的差值
ref_stage = ref_value
sheet.cell(row=row_idx, column=5, value=ref_stage)
prev_ref = ref_value # 更新为绝对时间
# 处理普通关键字
else:
if dut_value is not None:
dut_stage = dut_value - prev_dut
sheet.cell(row=row_idx, column=4, value=dut_stage)
prev_dut = dut_value
if ref_value is not None:
ref_stage = ref_value - prev_ref
sheet.cell(row=row_idx, column=5, value=ref_stage)
prev_ref = ref_value
# 计算差值
if dut_st