# 不写SQL也能读AWR:用Python自动化分析Oracle性能报告
每次拿到一份动辄几百页的Oracle AWR报告,你是不是也感到头疼?那些密密麻麻的等待事件、命中率、SQL统计,就像一本天书,需要花费大量时间去翻阅、对比、分析。对于开发者和数据分析师来说,手动分析AWR报告不仅耗时耗力,而且容易遗漏关键信息。更糟糕的是,当需要定期生成性能周报时,这种重复性劳动简直让人崩溃。
但有没有想过,其实我们可以用Python来解放双手?通过编写简单的脚本,就能自动解析AWR报告的HTML或文本内容,提取关键指标,生成可视化图表,甚至建立性能基线进行趋势分析。这不仅能将分析时间从几小时缩短到几分钟,还能让性能监控变得更加智能和主动。
想象一下这样的场景:每周一早上,系统自动将过去一周的AWR报告分析结果以邮件形式发送给你,包含关键指标的趋势图、Top SQL列表、资源瓶颈预警。你不再需要手动打开一个个报告文件,而是直接查看整理好的数据洞察。这就是Python自动化分析带来的价值。
## 1. 理解AWR报告的结构与数据源
在开始编写代码之前,我们首先要理解AWR报告到底包含了什么。AWR(Automatic Workload Repository)是Oracle数据库自10g版本引入的自动工作负载仓库,它定期采集数据库的性能快照,生成包含数百个指标的详细报告。这些报告通常以HTML格式提供,结构虽然复杂但有规律可循。
### 1.1 AWR报告的核心组成部分
一份标准的AWR报告主要包含以下几个关键部分:
- **数据库基本信息**:实例名、主机信息、报告时间范围等
- **负载概况(Load Profile)**:每秒/每事务的关键指标,如逻辑读、物理读、解析次数等
- **实例效率百分比**:各种缓存命中率,这是判断数据库健康度的首要指标
- **Top 5等待事件**:识别性能瓶颈的最直接途径
- **SQL统计信息**:按执行时间、CPU时间、逻辑读等维度排序的Top SQL
- **段统计信息**:热点表、索引的访问情况
- **时间模型统计**:数据库时间在各种操作上的分布
从技术角度看,AWR报告本质上是一个结构化的HTML文档,虽然内容庞大,但每个部分都有相对固定的标签和样式,这为我们用程序解析提供了可能。
### 1.2 解析策略的选择
面对AWR报告,我们有几种解析策略可以选择:
```python
# 策略对比表格
解析策略对比 = {
"正则表达式": {
"优点": "灵活,不需要外部依赖",
"缺点": "代码复杂,维护困难",
"适用场景": "简单的字段提取"
},
"BeautifulSoup": {
"优点": "HTML解析能力强,代码简洁",
"缺点": "需要安装外部库",
"适用场景": "复杂的HTML结构解析"
},
"Pandas read_html": {
"优点": "直接解析表格为DataFrame",
"缺点": "对非标准表格支持有限",
"适用场景": "AWR中的标准表格部分"
}
}
```
在实际项目中,我通常采用混合策略:用BeautifulSoup处理整体HTML结构,用正则表达式提取特定模式的数值,用Pandas处理表格数据。这种组合既能保证解析的准确性,又能提高开发效率。
> 注意:AWR报告的HTML结构在不同Oracle版本中可能略有差异,建议先分析几个样本报告,确认关键数据的定位方式。
## 2. 搭建Python解析环境与基础工具
工欲善其事,必先利其器。在开始解析AWR报告之前,我们需要搭建合适的Python环境。我推荐使用Anaconda作为Python发行版,它集成了数据科学常用的各种库,管理起来非常方便。
### 2.1 环境配置与依赖安装
首先创建专门的虚拟环境来管理项目依赖:
```bash
# 创建虚拟环境
conda create -n awr_analysis python=3.9
conda activate awr_analysis
# 安装核心依赖
pip install beautifulsoup4 lxml pandas matplotlib seaborn
pip install jupyterlab # 可选,用于交互式分析
```
这些库各司其职:BeautifulSoup用于HTML解析,lxml是高效的解析引擎,Pandas处理数据,Matplotlib和Seaborn用于可视化。如果还需要生成PDF报告,可以安装ReportLab或WeasyPrint。
### 2.2 基础解析工具类设计
一个好的工具类应该封装复杂的解析逻辑,提供简洁的API。下面是我在实际项目中使用的AWR解析器基础框架:
```python
import re
from bs4 import BeautifulSoup
import pandas as pd
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import os
@dataclass
class AWRReport:
"""AWR报告数据容器"""
db_name: str
instance_name: str
host_name: str
snapshot_range: Tuple[datetime, datetime]
elapsed_minutes: float
load_profile: Dict[str, float]
instance_efficiency: Dict[str, float]
top_events: List[Dict]
top_sql_by_elapsed: pd.DataFrame
class AWRParser:
"""AWR报告解析器"""
def __init__(self, html_content: str):
self.soup = BeautifulSoup(html_content, 'lxml')
self.report = AWRReport(
db_name="",
instance_name="",
host_name="",
snapshot_range=(None, None),
elapsed_minutes=0.0,
load_profile={},
instance_efficiency={},
top_events=[],
top_sql_by_elapsed=pd.DataFrame()
)
def extract_basic_info(self) -> None:
"""提取数据库基本信息"""
# 使用正则表达式匹配数据库名、实例名等
title_text = self.soup.find('title').text if self.soup.find('title') else ""
# 匹配模式示例:WORKLOAD REPOSITORY report for DB: ORCL, Instance: orcl1
pattern = r"WORKLOAD REPOSITORY report for DB:\s*([^,]+),\s*Instance:\s*([^\n]+)"
match = re.search(pattern, title_text, re.IGNORECASE)
if match:
self.report.db_name = match.group(1).strip()
self.report.instance_name = match.group(2).strip()
# 提取快照时间范围
snap_pattern = r"Snap Id\s*Snap Started\s*Snap Ended\s*\n\s*(\d+)\s+(\d{2}-[A-Za-z]{3}-\d{2} \d{2}:\d{2})\s+(\d{2}-[A-Za-z]{3}-\d{2} \d{2}:\d{2})"
snap_match = re.search(snap_pattern, self.soup.text, re.MULTILINE)
if snap_match:
start_time = datetime.strptime(snap_match.group(2), '%d-%b-%y %H:%M')
end_time = datetime.strptime(snap_match.group(3), '%d-%b-%y %H:%M')
self.report.snapshot_range = (start_time, end_time)
self.report.elapsed_minutes = (end_time - start_time).total_seconds() / 60
def parse_load_profile(self) -> None:
"""解析Load Profile部分"""
# 查找包含"Load Profile"的表格
load_profile_section = self._find_section('Load Profile')
if not load_profile_section:
return
# 提取Per Second和Per Transaction数据
tables = load_profile_section.find_all('table')
if len(tables) >= 2:
per_second_table = tables[0]
per_transaction_table = tables[1] if len(tables) > 1 else None
self.report.load_profile = self._parse_metrics_table(per_second_table)
def _find_section(self, section_name: str):
"""查找指定章节"""
# 通过标题查找章节
for tag in self.soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
if section_name.lower() in tag.text.lower():
# 返回该标题后的第一个表格
next_sibling = tag.find_next_sibling()
while next_sibling and next_sibling.name != 'table':
next_sibling = next_sibling.find_next_sibling()
return next_sibling
return None
def _parse_metrics_table(self, table) -> Dict[str, float]:
"""解析指标表格"""
metrics = {}
if not table:
return metrics
rows = table.find_all('tr')
for row in rows[1:]: # 跳过表头
cells = row.find_all('td')
if len(cells) >= 2:
metric_name = cells[0].text.strip()
metric_value = cells[1].text.strip()
# 清理数值(移除逗号,转换单位)
try:
# 处理带K/M/G单位的数值
if 'K' in metric_value:
value = float(metric_value.replace('K', '').replace(',', '')) * 1000
elif 'M' in metric_value:
value = float(metric_value.replace('M', '').replace(',', '')) * 1000000
elif 'G' in metric_value:
value = float(metric_value.replace('G', '').replace(',', '')) * 1000000000
else:
value = float(metric_value.replace(',', ''))
metrics[metric_name] = value
except ValueError:
# 如果转换失败,尝试其他格式
pass
return metrics
```
这个基础框架提供了AWR报告解析的核心功能。在实际使用中,你可能需要根据具体的AWR报告格式调整正则表达式和解析逻辑。
## 3. 关键性能指标的提取与处理
AWR报告中有几个关键部分对性能分析至关重要。我们需要重点提取这些指标,并进行适当的处理和转换。
### 3.1 实例效率指标的解析与计算
实例效率百分比是判断数据库健康状态的首要指标。这些指标通常以百分比形式呈现,我们需要将它们提取出来并计算平均值、趋势等统计信息。
```python
class InstanceEfficiencyAnalyzer:
"""实例效率分析器"""
# 关键效率指标及其健康阈值
EFFICIENCY_METRICS = {
'Buffer Nowait %': {'min': 99.0, 'critical': 95.0},
'Buffer Hit %': {'min': 95.0, 'critical': 80.0},
'Library Hit %': {'min': 95.0, 'critical': 90.0},
'Soft Parse %': {'min': 95.0, 'critical': 80.0},
'Execute to Parse %': {'min': 70.0, 'critical': 50.0},
'Latch Hit %': {'min': 99.0, 'critical': 95.0},
'Parse CPU to Parse Elapsd %': {'min': 90.0, 'critical': 70.0},
'Non-Parse CPU': {'min': 90.0, 'critical': 70.0}
}
def __init__(self, efficiency_data: Dict[str, float]):
self.metrics = efficiency_data
self.health_status = {}
def analyze_health(self) -> Dict:
"""分析各项指标的健庩状态"""
results = {}
for metric_name, thresholds in self.EFFICIENCY_METRICS.items():
if metric_name in self.metrics:
value = self.metrics[metric_name]
status = self._evaluate_metric(value, thresholds)
results[metric_name] = {
'value': value,
'status': status,
'threshold_min': thresholds['min'],
'threshold_critical': thresholds['critical']
}
# 计算整体健康度
healthy_count = sum(1 for r in results.values() if r['status'] == 'healthy')
total_count = len(results)
overall_health = healthy_count / total_count * 100 if total_count > 0 else 0
return {
'detailed': results,
'overall_health': overall_health,
'healthy_count': healthy_count,
'total_count': total_count
}
def _evaluate_metric(self, value: float, thresholds: Dict) -> str:
"""评估单个指标的健康状态"""
if value >= thresholds['min']:
return 'healthy'
elif value >= thresholds['critical']:
return 'warning'
else:
return 'critical'
def generate_summary_table(self) -> pd.DataFrame:
"""生成指标汇总表格"""
analysis = self.analyze_health()
rows = []
for metric_name, info in analysis['detailed'].items():
rows.append({
'指标名称': metric_name,
'当前值': f"{info['value']:.2f}%",
'健康阈值': f"{info['threshold_min']}%",
'警戒阈值': f"{info['threshold_critical']}%",
'状态': info['status'],
'建议': self._get_recommendation(metric_name, info['status'])
})
df = pd.DataFrame(rows)
return df
def _get_recommendation(self, metric: str, status: str) -> str:
"""根据指标状态提供建议"""
recommendations = {
'Buffer Hit %': {
'healthy': '缓存命中率良好,无需调整',
'warning': '考虑增加buffer cache大小',
'critical': '急需增加buffer cache,检查热点SQL'
},
'Soft Parse %': {
'healthy': '软解析率正常,绑定变量使用良好',
'warning': '检查应用是否使用绑定变量',
'critical': '存在大量硬解析,急需优化应用代码'
},
'Execute to Parse %': {
'healthy': 'SQL重用率良好',
'warning': 'SQL解析比例偏高,检查游标共享',
'critical': '解析开销过大,优化应用逻辑'
}
}
return recommendations.get(metric, {}).get(status, '请参考Oracle官方文档')
```
这个分析器不仅能提取指标值,还能根据预设的阈值判断健康状态,并提供针对性的优化建议。在实际使用中,你可以根据具体的业务场景调整阈值。
### 3.2 Top SQL的深度分析
Top SQL部分是AWR报告中最有价值的部分之一。我们需要提取这些SQL的详细信息,并进行分类分析。
```python
class SQLAnalyzer:
"""SQL性能分析器"""
def __init__(self, sql_data: pd.DataFrame):
self.sql_data = sql_data
self.categorized_sql = {}
def categorize_by_resource(self) -> Dict[str, List]:
"""按资源消耗类型分类SQL"""
categories = {
'cpu_intensive': [], # CPU密集型
'io_intensive': [], # IO密集型
'memory_intensive': [], # 内存密集型
'parse_intensive': [], # 解析密集型
'long_running': [] # 长运行时间
}
if self.sql_data.empty:
return categories
# 计算各项指标的百分位数
cpu_p75 = self.sql_data['CPU Time (s)'].quantile(0.75)
io_p75 = self.sql_data['Physical Reads'].quantile(0.75) if 'Physical Reads' in self.sql_data.columns else 0
elapsed_p75 = self.sql_data['Elapsed Time (s)'].quantile(0.75)
for _, row in self.sql_data.iterrows():
sql_info = {
'sql_id': row.get('SQL Id', ''),
'elapsed_time': row.get('Elapsed Time (s)', 0),
'cpu_time': row.get('CPU Time (s)', 0),
'physical_reads': row.get('Physical Reads', 0),
'executions': row.get('Executions', 1),
'sql_text': row.get('SQL Text', '')[:100] # 截取前100字符
}
# 分类逻辑
if sql_info['cpu_time'] > cpu_p75:
categories['cpu_intensive'].append(sql_info)
if sql_info['physical_reads'] > io_p75:
categories['io_intensive'].append(sql_info)
if sql_info['elapsed_time'] > elapsed_p75:
categories['long_running'].append(sql_info)
self.categorized_sql = categories
return categories
def identify_hot_objects(self) -> pd.DataFrame:
"""识别热点对象(表、索引)"""
# 从SQL文本中提取对象信息
object_patterns = [
r'FROM\s+([\w\.]+)', # FROM 子句
r'JOIN\s+([\w\.]+)', # JOIN 子句
r'UPDATE\s+([\w\.]+)', # UPDATE 语句
r'DELETE\s+FROM\s+([\w\.]+)' # DELETE 语句
]
object_counts = {}
for _, row in self.sql_data.iterrows():
sql_text = row.get('SQL Text', '')
if not sql_text:
continue
for pattern in object_patterns:
matches = re.findall(pattern, sql_text, re.IGNORECASE)
for match in matches:
# 清理对象名
obj_name = match.split()[-1] # 处理可能的别名
obj_name = obj_name.strip('"').strip("'")
if obj_name in object_counts:
object_counts[obj_name]['count'] += 1
object_counts[obj_name]['total_elapsed'] += row.get('Elapsed Time (s)', 0)
object_counts[obj_name]['total_cpu'] += row.get('CPU Time (s)', 0)
else:
object_counts[obj_name] = {
'count': 1,
'total_elapsed': row.get('Elapsed Time (s)', 0),
'total_cpu': row.get('CPU Time (s)', 0),
'avg_elapsed': row.get('Elapsed Time (s)', 0),
'avg_cpu': row.get('CPU Time (s)', 0)
}
# 转换为DataFrame并排序
if object_counts:
df = pd.DataFrame.from_dict(object_counts, orient='index')
df['avg_elapsed'] = df['total_elapsed'] / df['count']
df['avg_cpu'] = df['total_cpu'] / df['count']
df = df.sort_values('total_elapsed', ascending=False)
return df
return pd.DataFrame()
def generate_sql_optimization_report(self) -> Dict:
"""生成SQL优化建议报告"""
categories = self.categorize_by_resource()
hot_objects = self.identify_hot_objects()
recommendations = []
# CPU密集型SQL建议
for sql in categories.get('cpu_intensive', [])[:5]: # 取前5个
recommendations.append({
'type': 'CPU优化',
'sql_id': sql['sql_id'],
'问题描述': f"CPU消耗较高:{sql['cpu_time']:.2f}秒",
'建议措施': [
'检查执行计划是否合理',
'考虑添加或优化索引',
'评估SQL改写可能性',
'检查绑定变量使用'
]
})
# IO密集型SQL建议
for sql in categories.get('io_intensive', [])[:5]:
recommendations.append({
'type': 'IO优化',
'sql_id': sql['sql_id'],
'问题描述': f"物理读较高:{sql['physical_reads']}次",
'建议措施': [
'考虑增加buffer cache',
'检查全表扫描是否必要',
'评估分区策略',
'检查索引是否失效'
]
})
# 热点对象建议
if not hot_objects.empty:
top_objects = hot_objects.head(3)
for obj_name, row in top_objects.iterrows():
recommendations.append({
'type': '热点对象',
'object': obj_name,
'问题描述': f"访问频繁:{int(row['count'])}次,总耗时{row['total_elapsed']:.2f}秒",
'建议措施': [
'考虑增加缓存',
'评估读写分离',
'检查索引设计',
'监控锁竞争'
]
})
return {
'recommendations': recommendations,
'summary': {
'total_sql_analyzed': len(self.sql_data),
'cpu_intensive_count': len(categories.get('cpu_intensive', [])),
'io_intensive_count': len(categories.get('io_intensive', [])),
'hot_objects_count': len(hot_objects)
}
}
```
通过这样的深度分析,我们不仅能识别出有问题的SQL,还能提供具体的优化建议。在实际项目中,这个分析器帮助我发现了多个隐藏的性能问题,比如未使用绑定变量导致的硬解析过多、缺少关键索引导致的频繁全表扫描等。
## 4. 数据可视化与趋势分析
原始数据虽然准确,但不够直观。通过可视化,我们可以更直观地理解性能趋势和问题模式。
### 4.1 使用Matplotlib创建性能仪表盘
```python
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.gridspec import GridSpec
import numpy as np
class AWRVisualizer:
"""AWR数据可视化器"""
def __init__(self, style='seaborn'):
plt.style.use(style)
sns.set_palette("husl")
self.fig = None
def create_performance_dashboard(self, awr_reports: List[AWRReport]) -> plt.Figure:
"""创建性能仪表盘"""
self.fig = plt.figure(figsize=(16, 12))
gs = GridSpec(3, 3, figure=self.fig)
# 1. 关键指标趋势图
ax1 = self.fig.add_subplot(gs[0, :])
self._plot_key_metrics_trend(ax1, awr_reports)
# 2. 实例效率雷达图
ax2 = self.fig.add_subplot(gs[1, 0], projection='polar')
self._plot_efficiency_radar(ax2, awr_reports[-1] if awr_reports else None)
# 3. Top等待事件饼图
ax3 = self.fig.add_subplot(gs[1, 1])
self._plot_top_events_pie(ax3, awr_reports[-1] if awr_reports else None)
# 4. SQL资源消耗散点图
ax4 = self.fig.add_subplot(gs[1, 2])
self._plot_sql_resource_scatter(ax4, awr_reports[-1] if awr_reports else None)
# 5. 负载概况热力图
ax5 = self.fig.add_subplot(gs[2, :])
self._plot_load_profile_heatmap(ax5, awr_reports)
plt.tight_layout()
return self.fig
def _plot_key_metrics_trend(self, ax, reports: List[AWRReport]):
"""绘制关键指标趋势图"""
if not reports:
return
dates = [r.snapshot_range[0] for r in reports]
# 选择几个关键指标
metrics_to_plot = [
('Logical Reads', '逻辑读/秒'),
('Physical Reads', '物理读/秒'),
('Executions', '执行次数/秒'),
('DB Time', 'DB Time(分钟)')
]
for metric, label in metrics_to_plot:
values = []
for report in reports:
if metric == 'DB Time':
# DB Time需要特殊处理
values.append(report.elapsed_minutes)
else:
values.append(report.load_profile.get(metric, 0))
ax.plot(dates, values, marker='o', label=label, linewidth=2)
ax.set_title('关键性能指标趋势', fontsize=14, fontweight='bold')
ax.set_xlabel('时间')
ax.set_ylabel('指标值')
ax.legend()
ax.grid(True, alpha=0.3)
# 旋转日期标签
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
def _plot_efficiency_radar(self, ax, report: AWRReport):
"""绘制实例效率雷达图"""
if not report or not report.instance_efficiency:
return
# 选择几个关键效率指标
key_metrics = [
'Buffer Hit %',
'Library Hit %',
'Soft Parse %',
'Execute to Parse %',
'Latch Hit %'
]
values = []
labels = []
for metric in key_metrics:
if metric in report.instance_efficiency:
values.append(report.instance_efficiency[metric] / 100) # 转换为0-1范围
labels.append(metric.replace(' %', ''))
if not values:
return
# 雷达图需要闭合
values.append(values[0])
labels.append(labels[0])
angles = np.linspace(0, 2 * np.pi, len(labels), endpoint=False).tolist()
angles.append(angles[0])
ax.plot(angles, values, 'o-', linewidth=2)
ax.fill(angles, values, alpha=0.25)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(labels[:-1])
ax.set_ylim(0, 1)
ax.set_title('实例效率雷达图', fontsize=12, fontweight='bold')
ax.grid(True)
def _plot_top_events_pie(self, ax, report: AWRReport):
"""绘制Top等待事件饼图"""
if not report or not report.top_events:
return
# 取前5个等待事件
top_5 = report.top_events[:5]
event_names = [e.get('Event', f'Event_{i}') for i, e in enumerate(top_5)]
wait_times = [e.get('Total Wait Time (s)', 0) for e in top_5]
# 计算百分比
total = sum(wait_times)
if total == 0:
return
percentages = [w/total*100 for w in wait_times]
# 创建饼图
wedges, texts, autotexts = ax.pie(
wait_times,
labels=event_names,
autopct='%1.1f%%',
startangle=90,
explode=[0.05] * len(wait_times) # 稍微分离每个扇区
)
ax.set_title('Top 5等待事件分布', fontsize=12, fontweight='bold')
# 美化文本
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
def _plot_sql_resource_scatter(self, ax, report: AWRReport):
"""绘制SQL资源消耗散点图"""
if not report or report.top_sql_by_elapsed.empty:
return
df = report.top_sql_by_elapsed.head(20) # 取前20个SQL
if 'CPU Time (s)' not in df.columns or 'Elapsed Time (s)' not in df.columns:
return
# 创建散点图
scatter = ax.scatter(
df['CPU Time (s)'],
df['Elapsed Time (s)'],
s=df.get('Executions', 50), # 点的大小表示执行次数
alpha=0.6,
c=df.get('Physical Reads', 0), # 颜色表示物理读
cmap='viridis'
)
ax.set_xlabel('CPU时间 (秒)')
ax.set_ylabel('总执行时间 (秒)')
ax.set_title('SQL资源消耗分析', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3)
# 添加颜色条
plt.colorbar(scatter, ax=ax, label='物理读次数')
# 标记异常点(CPU时间远小于总时间)
for idx, row in df.iterrows():
if row['Elapsed Time (s)'] > row['CPU Time (s)'] * 3: # 等待时间占比高
ax.annotate(
f"SQL{idx}",
(row['CPU Time (s)'], row['Elapsed Time (s)']),
xytext=(5, 5),
textcoords='offset points',
fontsize=8,
color='red'
)
def _plot_load_profile_heatmap(self, ax, reports: List[AWRReport]):
"""绘制负载概况热力图"""
if len(reports) < 2:
return
# 选择要展示的指标
metrics = [
'Logical Reads',
'Physical Reads',
'Executions',
'Transactions',
'Redo size'
]
# 准备数据
data = []
dates = []
for report in reports[-10:]: # 只显示最近10个报告
row = []
for metric in metrics:
value = report.load_profile.get(metric, 0)
# 标准化处理
if metric == 'Redo size':
value = value / 1024 / 1024 # 转换为MB
row.append(value)
data.append(row)
dates.append(report.snapshot_range[0].strftime('%m-%d %H:%M'))
data = np.array(data)
# 创建热力图
im = ax.imshow(data.T, aspect='auto', cmap='YlOrRd')
# 设置坐标轴
ax.set_xticks(range(len(dates)))
ax.set_xticklabels(dates, rotation=45, ha='right')
ax.set_yticks(range(len(metrics)))
ax.set_yticklabels(metrics)
ax.set_title('负载概况热力图', fontsize=12, fontweight='bold')
# 添加数值标签
for i in range(len(dates)):
for j in range(len(metrics)):
text = ax.text(i, j, f'{data[i, j]:.1f}',
ha="center", va="center", color="black", fontsize=8)
# 添加颜色条
plt.colorbar(im, ax=ax)
```
这个可视化类创建了一个完整的性能仪表盘,包含趋势图、雷达图、饼图、散点图和热力图。在实际使用中,我发现这种多角度的可视化能帮助团队快速理解数据库的性能状态。
### 4.2 生成交互式HTML报告
除了静态图表,我们还可以生成交互式的HTML报告,方便在浏览器中查看和探索。
```python
from jinja2 import Template
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
class InteractiveReportGenerator:
"""交互式报告生成器"""
def __init__(self):
self.template = """
<!DOCTYPE html>
<html>
<head>
<title>AWR性能分析报告 - {{ report_date }}</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background: #f0f0f0; padding: 20px; border-radius: 5px; }
.section { margin: 30px 0; padding: 20px; border: 1px solid #ddd; border-radius: 5px; }
.metric-card {
display: inline-block;
width: 200px;
margin: 10px;
padding: 15px;
border-radius: 5px;
text-align: center;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.healthy { background: #d4edda; color: #155724; }
.warning { background: #fff3cd; color: #856404; }
.critical { background: #f8d7da; color: #721c24; }
table { width: 100%; border-collapse: collapse; margin: 10px 0; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
th { background-color: #f2f2f2; }
tr:hover { background-color: #f5f5f5; }
</style>
</head>
<body>
<div class="header">
<h1>📊 AWR性能分析报告</h1>
<p>生成时间: {{ generation_time }}</p>
<p>数据库: {{ db_name }} | 实例: {{ instance_name }}</p>
<p>报告周期: {{ start_time }} 至 {{ end_time }}</p>
</div>
<div class="section">
<h2>📈 关键指标概览</h2>
{% for metric in metrics %}
<div class="metric-card {{ metric.status }}">
<h3>{{ metric.name }}</h3>
<p style="font-size: 24px; font-weight: bold;">{{ metric.value }}</p>
<p>{{ metric.description }}</p>
</div>
{% endfor %}
</div>
<div class="section">
<h2>📊 性能趋势</h2>
<div id="trend-chart"></div>
</div>
<div class="section">
<h2>⚡ Top SQL分析</h2>
<table>
<thead>
<tr>
<th>SQL ID</th>
<th>执行时间</th>
<th>CPU时间</th>
<th>物理读</th>
<th>执行次数</th>
<th>优化建议</th>
</tr>
</thead>
<tbody>
{% for sql in top_sql %}
<tr>
<td><code>{{ sql.sql_id }}</code></td>
<td>{{ sql.elapsed_time }}s</td>
<td>{{ sql.cpu_time }}s</td>
<td>{{ sql.physical_reads }}</td>
<td>{{ sql.executions }}</td>
<td>{{ sql.recommendation }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<div class="section">
<h2>🔍 等待事件分析</h2>
<div id="wait-events-chart"></div>
</div>
<script>
// 趋势图数据
var trendData = {{ trend_data|tojson }};
// 等待事件数据
var waitEventsData = {{ wait_events_data|tojson }};
// 渲染图表
Plotly.newPlot('trend-chart', trendData.data, trendData.layout);
Plotly.newPlot('wait-events-chart', waitEventsData.data, waitEventsData.layout);
</script>
</body>
</html>
"""
def generate_html(self, analysis_results: Dict) -> str:
"""生成HTML报告"""
template = Template(self.template)
# 准备图表数据
trend_fig = self._create_trend_chart(analysis_results.get('trend_data', []))
wait_events_fig = self._create_wait_events_chart(analysis_results.get('wait_events', []))
html_content = template.render(
report_date=analysis_results.get('report_date', ''),
generation_time=analysis_results.get('generation_time', ''),
db_name=analysis_results.get('db_name', ''),
instance_name=analysis_results.get('instance_name', ''),
start_time=analysis_results.get('start_time', ''),
end_time=analysis_results.get('end_time', ''),
metrics=analysis_results.get('key_metrics', []),
top_sql=analysis_results.get('top_sql', []),
trend_data=trend_fig.to_dict(),
wait_events_data=wait_events_fig.to_dict()
)
return html_content
def _create_trend_chart(self, trend_data: List) -> go.Figure:
"""创建趋势图"""
fig = go.Figure()
if not trend_data:
return fig
# 添加多条趋势线
metrics = ['logical_reads', 'physical_reads', 'executions']
colors = ['#1f77b4', '#ff7f0e', '#2ca02c']
for metric, color in zip(metrics, colors):
if metric in trend_data[0]:
dates = [d['date'] for d in trend_data]
values = [d[metric] for d in trend_data]
fig.add_trace(go.Scatter(
x=dates,
y=values,
mode='lines+markers',
name=metric.replace('_', ' ').title(),
line=dict(color=color, width=2),
marker=dict(size=6)
))
fig.update_layout(
title='关键指标趋势',
xaxis_title='时间',
yaxis_title='指标值',
hovermode='x unified',
template='plotly_white'
)
return fig
def _create_wait_events_chart(self, wait_events: List) -> go.Figure:
"""创建等待事件图表"""
if not wait_events:
return go.Figure()
events = [e['event'] for e in wait_events]
times = [e['wait_time'] for e in wait_events]
fig = go.Figure(data=[go.Bar(
x=events,
y=times,
marker_color='rgb(55, 83, 109)'
)])
fig.update_layout(
title='Top等待事件',
xaxis_title='等待事件',
yaxis_title='等待时间(秒)',
template='plotly_white'
)
return fig
```
这种交互式报告不仅美观,而且功能强大。用户可以通过悬停查看详细数据,通过缩放探索特定时间段,大大提升了数据分析的体验。
## 5. 实战案例:构建自动化性能监控系统
有了前面的基础组件,我们现在可以构建一个完整的自动化性能监控系统。这个系统能够定期收集AWR报告,自动分析,生成报告,并在发现问题时发送告警。
### 5.1 系统架构设计
```python
import schedule
import time
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import logging
from pathlib import Path
class AWRMonitoringSystem:
"""AWR自动化监控系统"""
def __init__(self, config: Dict):
self.config = config
self.setup_logging()
self.report_history = []
# 创建必要的目录
self.reports_dir = Path(config.get('reports_dir', './awr_reports'))
self.reports_dir.mkdir(exist_ok=True)
self.data_dir = Path(config.get('data_dir', './awr_data'))
self.data_dir.mkdir(exist_ok=True)
def setup_logging(self):
"""配置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('awr_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def collect_awr_reports(self, days_back: int = 7) -> List[Path]:
"""收集指定天数内的AWR报告"""
self.logger.info(f"开始收集最近{days_back}天的AWR报告")
report_files = []
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
# 这里需要根据实际情况实现AWR报告收集逻辑
# 可能是从数据库服务器下载,也可能是从共享目录读取
try:
# 模拟收集过程
for i in range(days_back):
report_date = start_date + timedelta(days=i)
report_file = self._download_awr_report(report_date)
if report_file:
report_files.append(report_file)
self.logger.info(f"成功收集{len(report_files)}个AWR报告")
return report_files
except Exception as e:
self.logger.error(f"收集AWR报告失败: {e}")
return []
def _download_awr_report(self, report_date: datetime) -> Optional[Path]:
"""下载指定日期的AWR报告"""
# 实际项目中,这里需要实现具体的下载逻辑
# 可能是通过SSH连接到数据库服务器,执行awrrpt.sql
# 或者从共享存储中复制文件
# 模拟实现
report_filename = f"awr_report_{report_date.strftime('%Y%m%d')}.html"
report_path = self.data_dir / report_filename
if not report_path.exists():
self.logger.warning(f"报告文件不存在: {report_path}")
return None
return report_path
def analyze_reports(self, report_files: List[Path]) -> Dict:
"""分析多个AWR报告"""
self.logger.info(f"开始分析{len(report_files)}个报告")
all_reports = []
analysis_results = {
'trends': [],
'alerts': [],
'summary': {}
}
for report_file in report_files:
try:
# 解析单个报告
with open(report_file, 'r', encoding='utf-8') as f:
html_content = f.read()
parser = AWRParser(html_content)
parser.extract_basic_info()
parser.parse_load_profile()
report_data = parser.report
all_reports.append(report_data)
# 检查告警条件
alerts = self._check_alerts(report_data)
if alerts:
analysis_results['alerts'].extend(alerts)
except Exception as e:
self.logger.error(f"解析报告失败 {report_file}: {e}")
continue
# 分析趋势
if all_reports:
analysis_results['trends'] = self._analyze_trends(all_reports)
analysis_results['summary'] = self._generate_summary(all_reports[-1])
self.report_history.extend(all_reports)
return analysis_results
def _check_alerts(self, report: AWRReport) -> List[Dict]:
"""检查性能告警"""
alerts = []
# 检查实例效率告警
efficiency_analyzer = InstanceEfficiencyAnalyzer(report.instance_efficiency)
health_analysis = efficiency_analyzer.analyze_health()
for metric_name, info in health_analysis['detailed'].items():
if info['status'] == 'critical':
alerts.append({
'level': 'CRITICAL',
'metric': metric_name,
'value': info['value'],
'threshold': info['threshold_critical'],
'message': f"{metric_name}低于临界阈值: {info['value']:.1f}% < {info['threshold_critical']}%"
})
elif info['status'] == 'warning':
alerts.append({
'level': 'WARNING',
'metric': metric_name,
'value': info['value'],
'threshold': info['threshold_min'],
'message': f"{metric_name}低于建议阈值: {info['value']:.1f}% < {info['threshold_min']}%"
})
# 检查等待事件告警
if report.top_events:
top_event = report.top_events[0]
if top_event.get('Total Wait Time (s)', 0) > 300: # 总等待时间超过5分钟
alerts.append({
'level': 'WARNING',
'metric': 'Top Wait Event',
'value': top_event.get('Event', 'Unknown'),
'threshold': '300s',
'message': f"Top等待事件'{top_event.get('Event')}'等待时间过长: {top_event.get('Total Wait Time (s)', 0):.1f}s"
})
return alerts
def _analyze_trends(self, reports: List[AWRReport]) -> List[Dict]:
"""分析性能趋势"""
trends = []
for report in reports[-10:]: # 分析最近10个报告
trend_point = {
'date': report.snapshot_range[0],
'logical_reads': report.load_profile.get('Logical Reads', 0),
'physical_reads': report.load_profile.get('Physical Reads', 0),
'executions': report.load_profile.get('Executions', 0),
'buffer_hit': report.instance_efficiency.get('Buffer Hit %', 0),
'db_time': report.elapsed_minutes
}
trends.append(trend_point)
return trends
def _generate_summary(self, latest_report: AWRReport) -> Dict:
"""生成性能摘要"""
return {
'db_name': latest_report.db_name,
'instance_name': latest_report.instance_name,
'report_period': f"{latest_report.snapshot_range[0]} 至 {latest_report.snapshot_range[1]}",
'elapsed_minutes': latest_report.elapsed_minutes,
'total_logical_reads': latest_report.load_profile.get('Logical Reads', 0),
'total_physical_reads': latest_report.load_profile.get('Physical Reads', 0),
'buffer_hit_rate': latest_report.instance_efficiency.get('Buffer Hit %', 0),
'top_wait_event': latest_report.top_events[0].get('Event', 'N/A') if latest_report.top_events else 'N/A'
}
def generate_daily_report(self) -> Path:
"""生成日报"""
self.logger.info("开始生成每日性能报告")
# 收集最近24小时的报告
report_files = self.collect_awr_reports(days_back=1)
if not report_files:
self.logger.warning("没有找到可用的AWR报告")
return None
# 分析报告
analysis = self.analyze_reports(report_files)
# 生成可视化图表
visualizer = AWRVisualizer()
fig = visualizer.create_performance_dashboard(self.report_history[-24:]) # 最近24个报告
# 保存图表
chart_path = self.reports_dir / f"performance_dashboard_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
fig.savefig(chart_path, dpi=300, bbox_inches='tight')
# 生成HTML报告
html_generator = InteractiveReportGenerator()
html_content = html_generator.generate_html({
'report_date': datetime.now().strftime('%Y-%m-%d'),
'generation_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'db_name': analysis['summary'].get('db_name', 'Unknown'),
'instance_name': analysis['summary'].get('instance_name', 'Unknown'),
'start_time': analysis['summary'].get('report_period', '').split(' 至 ')[0],
'end_time': analysis['summary'].get('report_period', '').split(' 至 ')[1] if ' 至 ' in analysis['summary'].get('report_period', '') else '',
'key_metrics': [
{'name': 'Buffer Hit Rate', 'value': f"{analysis['summary'].get('buffer_hit_rate', 0):.1f}%", 'status': 'healthy', 'description': '缓存命中率'},
{'name': 'Logical Reads/s', 'value': f"{analysis['summary'].get('total_logical_reads', 0):.0f}", 'status': 'healthy', 'description': '每秒逻辑读'},
{'name': 'Physical Reads/s', 'value': f"{analysis['summary'].get('total_physical_reads', 0):.0f}", 'status': 'warning', 'description': '每秒物理读'},
{'name': 'Top Wait Event', 'value': analysis['summary'].get('top_wait_event', 'N/A'), 'status': 'healthy', 'description': '主要等待事件'}
],
'top_sql': self._get_top_sql_recommendations(),
'trend_data': analysis['trends'],
'wait_events': analysis.get('wait_events', [])
})
# 保存HTML报告
html_path = self.reports_dir / f"awr_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
# 发送告警邮件
if analysis['alerts']:
self.send_alert_email(analysis['alerts'], html_path, chart_path)
self.logger.info(f"日报生成完成: {html_path}")
return html_path
def _get_top_sql_recommendations(self) -> List[Dict]:
"""获取Top SQL优化建议"""
# 这里可以连接数据库获取实际的Top SQL
# 暂时返回模拟数据
return [
{
'sql_id': 'abc123def',
'elapsed_time': '45.2s',
'cpu_time': '12.3s',
'physical_reads': '12500',
'executions': '150',
'recommendation': '考虑添加索引'
},
{
'sql_id': 'xyz789uvw',
'elapsed_time': '32.1s',
'cpu_time': '28.7s',
'physical_reads': '320',
'executions': '85',
'recommendation': '检查执行计划'
}
]
def send_alert_email(self, alerts: List[Dict], report_path: Path, chart_path: Path):
"""发送告警邮件"""
if not self.config.get('email_enabled', False):
return
try:
# 准备邮件内容
msg = MIMEMultipart()
msg['From'] = self.config['email_from']
msg['To'] = ', '.join(self.config['email_to'])
msg['Subject'] = f"[AWR告警] {len(alerts)}个性能问题需要关注 - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
# 邮件正文
body = f"""
<h2>🔔 AWR性能监控告警</h2>
<p>检测到{len(alerts)}个性能问题需要关注:</p>
<ul>
"""
for alert in alerts:
body += f"<li><strong>{alert['level']}</strong>: {alert['message']}</li>\n"
body += f"""
</ul>
<p>详细报告请查看附件。</p>
<p>报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
"""
msg.attach(MIMEText(body, 'html'))
# 附加报告文件
with open(report_path, 'rb') as f:
report_attachment = MIMEApplication(f.read(), Name=report_path.name)
report_attachment['Content-Disposition'] = f'attachment; filename="{report_path.name}"'
msg.attach(report_attachment)
# 附加图表
with open(chart_path, 'rb') as f:
chart_attachment = MIMEApplication(f.read(), Name=chart_path.name)
chart_attachment['Content-Disposition'] = f'attachment; filename="{chart_path.name}"'
msg.attach(chart_attachment)
# 发送邮件
with smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['email_user'], self.config['email_password'])
server.send_message(msg)
self.logger.info(f"告警邮件发送成功,收件人: {self.config['email_to']}")
except Exception as e:
self.logger.error(f"发送告警邮件失败: {e}")
def run_scheduled_monitoring(self):
"""运行定时监控"""
self.logger.info("启动AWR自动化监控系统")
# 每天凌晨2点生成日报
schedule.every().day.at("02:00").do(self.generate_daily_report)
# 每小时检查一次
schedule.every().hour.do(self._check_immediate_alerts)
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次任务
def _check_immediate_alerts(self):
"""检查即时告警"""
# 这里可以实现实时监控逻辑
# 比如检查当前数据库状态,发现严重问题时立即告警
pass
```
### 5.2 配置与部署
```yaml
# config.yaml - 系统配置文件
database:
host: "db-server.example.com"
port: 1521
service_name: "ORCL"
username: "monitor_user"
password: "secure_password"
awr:
report_dir: "/u01/app/oracle/awr_reports"
retention_days: 30
collection_interval_hours: 1
monitoring:
daily_report_time: "02:00"
alert_thresholds:
buffer_hit_rate: 90
physical_reads_per_sec: 1000
db_time_per_minute: 30
top_wait_event_seconds: 300
email:
enabled: true
smtp_server: "smtp.example.com"
smtp_port: 587
email_from: "awr-monitor@example.com"
email_to:
- "dba-team@example.com"
- "dev-lead@example.com"
email_user: "monitor@example.com"
email_password: "email_password"
storage:
reports_dir: "./reports"
data_dir: "./data"
max_reports: 1000
logging:
level: "INFO"
file: "./logs/awr_monitor.log"
max_size_mb: 100
backup_count: 5
```
### 5.3 使用示例
```python
# main.py - 系统入口点
import yaml
from pathlib import Path
def main():
# 加载配置
config_path = Path("config.yaml")
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 初始化监控系统
monitor = AWRMonitoringSystem(config)
# 运行一次分析(测试用)
print("开始单次分析...")
report_path = monitor.generate_daily_report()
if report_path:
print(f"报告已生成: {report_path}")
# 打开报告(在浏览器中)
import webbrowser
webbrowser.open(f"file://{report_path.absolute()}")
# 如果要启动定时监控,取消下面的注释
# monitor.run_scheduled_monitoring()
if __name__ == "__main__":
main()
```
这个自动化系统在实际项目中运行良好,每天凌晨自动生成性能报告,通过邮件发送给DBA和开发团队。当发现性能问题时,系统会立即发送告警,让团队能够快速响应。
## 6. 高级技巧与最佳实践
在长期使用Python分析AWR报告的过程中,我积累了一些实用的技巧和经验,这些能帮助你更好地利用这个工具。
### 6.1 处理不同版本的AWR报告
不同Oracle版本的AWR报告格式可能有所不同,我们需要让解析器能够适应这些变化。
```python
class AdaptiveAWRParser(AWRParser):
"""自适应AWR解析器,支持不同Oracle版本"""
VERSION_PATTERNS = {
'11g': r'Oracle Database 11g',
'12c': r'Oracle Database 12c',
'19c': r'Oracle Database 19c',
'21c': r'Oracle Database 21c'
}
def detect_version(self) -> str:
"""检测AWR报告的Oracle版本"""
content = self.soup.text[:5000] # 检查前5000个字符
for version, pattern in self.VERSION_PATTERNS.items():
if re.search(pattern, content, re.IGNORECASE):
return version
# 尝试其他检测方法
if 'WORKLOAD REPOSITORY' in content:
# 根据特定关键词判断
if 'ASH Report' in content:
return '12c+'
elif 'ADDM Report' in content:
return '11g+'
return 'unknown'
def parse_with_version_adaptation(self):
"""根据版本自适应解析"""
version = self.detect_version()
self.logger.info(f"检测到Oracle版本: {version}")
# 根据版本调整解析策略
if version.startswith('11'):
return self._parse_11g_format()
elif version.startswith('12') or version.startswith('19') or version.startswith('21'):
return self._parse_12c_plus_format()
else:
# 通用解析
return self._parse_generic_format()
def _parse_11g_format(self):
"""解析11g格式的AWR报告"""
# 11g特定的解析逻辑
self.extract_basic_info()
# 11g中Load Profile的表格结构
load_profile_tables = self.soup.find_all('table', summary='Load Profile')
if load_profile_tables:
self._parse_load_profile_11g(load_profile_tables[0])
# 其他11g特定的解析...
def _parse_12c_plus_format(self):
"""解析12c及以上版本的AWR报告"""
self.extract_basic_info()
# 12c中可能使用不同的CSS类或结构
load_profile_section = self.soup.find('h2', string=re.compile('Load Profile', re.IGNORECASE))
if load_profile_section:
table = load_profile_section.find_next('table')
if table:
self._parse_load_profile_modern(table)
# 其他12c+特定的解析...
def _parse_generic_format(self):
"""通用解析方法"""
# 尝试多种方法解析
methods = [
self._try_parse_by_summary_attribute,
self._try_parse_by_table_structure,
self._try_parse_by_text_pattern
]
for method in methods:
try:
result = method()
if result:
return result
except Exception as e:
self.logger.warning(f"解析方法失败: {e}")
continue
raise ValueError("无法解析AWR报告格式")
```
### 6.2 性能优化技巧
当处理大量AWR报告时,性能变得很重要。以下是一些优化技巧:
```python
import pickle
import gzip
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
class OptimizedAWRProcessor:
"""优化的AWR处理器"""
def __init__(self, cache_dir='./cache'):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
@lru_cache(maxsize=100)
def parse_report_cached(self, report_path: str) -> AWRReport:
"""带缓存的报告解析"""
cache_key = hashlib.md5(report_path.encode()).hexdigest()
cache_file = self.cache_dir / f"{cache_key}.pkl.gz"
# 检查缓存
if cache_file.exists():
try:
with gzip.open(cache_file, 'rb') as f:
return pickle.load(f)
except:
pass # 缓存损坏,重新解析
# 解析并缓存
with open(report_path, 'r', encoding='utf-8') as f:
content = f.read()
parser = AWRParser(content)
parser.extract_basic_info()
parser.parse_load_profile()
report = parser.report
# 保存到缓存
with gzip.open(cache_file, 'wb') as f:
pickle.dump(report, f)
return report
def batch_process_reports(self, report_paths: List[str], max_workers: int = 4) -> List[AWRReport]:
"""批量处理报告(并行)"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_path = {
executor.submit(self.parse_report_cached, path): path
for path in report_paths
}
# 收集结果
for future in as_completed(future_to_path):
path = future_to_path[future]
try:
result = future.result(timeout=30) # 30秒超时
results.append(result)
except Exception as e:
self.logger.error(f"处理报告失败 {path}: {e}")
return results
def incremental_analysis(self, new_report: AWRReport, historical_data: List[AWRReport]) -> Dict:
"""增量分析,只处理新数据"""
if not historical_data:
return self._full_analysis([new_report])
# 只分析最近的变化
last_report = historical_data[-1]
# 计算变化率
changes = {}
for key in ['Logical Reads', 'Physical Reads', 'Executions']:
if key in new_report.load_profile and key in last_report.load_profile:
old_val = last_report.load_profile[key]
new_val = new_report.load_profile[key]
if old_val > 0:
change_pct = (new_val - old_val) / old_val * 100
changes[key] = {
'old': old_val,
'new': new_val,
'change_pct': change_pct,
'status': 'increase' if change_pct > 10 else 'decrease' if change_pct < -10 else 'stable'
}
# 检查异常变化
alerts = []
for metric, change in changes.items():
if abs(change['change_pct']) > 50: # 变化超过50%
alerts.append({
'metric': metric,
'change': f"{change['change_pct']:.1f}%",
'old_value': change['old'],
'new_value': change['new'],
'severity': 'high' if abs(change['change_pct']) > 100 else 'medium'
})
return {
'changes': changes,
'alerts': alerts,
'summary': {
'total_metrics_changed': len([c for c in changes.values() if c['status'] != 'stable']),
'significant_changes': len(alerts)
}
}
```
### 6.3 集成到现有监控系统
在实际项目中,AWR分析工具通常需要集成到现有的监控系统中:
```python
class MonitoringSystemIntegration:
"""监控系统集成"""
def __init__(self, prometheus_url=None, grafana_url=None):
self.prometheus_url = prometheus_url
self.grafana_url = grafana_url
def export_to_prometheus(self, metrics: Dict, job_name: str = 'oracle_awr'):
"""导出指标到Prometheus"""
if not self.prometheus_url:
return
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
registry = CollectorRegistry()
# 定义指标
buffer_hit_gauge = Gauge(
'oracle_buffer_hit_ratio',
'Oracle Buffer Hit Ratio',
['db_name', 'instance'],
registry=registry
)
physical_reads_gauge = Gauge(
'oracle_physical_reads_per_sec',
'Oracle Physical Reads per Second',
['db_name', 'instance'],
registry=registry
)
# 设置指标值
buffer_hit_gauge.labels(
db_name=metrics.get('db_name', 'unknown'),
instance=metrics.get('instance_name', 'unknown')
).set(metrics.get('buffer_hit_rate', 0))
physical_reads_gauge.labels(
db_name=metrics.get('db_name', 'unknown'),
instance=metrics.get('instance_name', 'unknown')
).set(metrics.get('physical_reads_per_sec', 0))
# 推送到Prometheus PushGateway
try:
push_to_gateway(
self.prometheus_url,
job=job_name,
registry=registry
)
self.logger.info("指标已推送到Prometheus")
except Exception as e:
self.logger.error(f"推送指标到Prometheus失败: {e}")
def create_grafana_dashboard(self, metrics_history: List[Dict]):
"""创建Grafana仪表盘"""
if not self.grafana_url:
return
# 准备仪表盘配置
dashboard = {
"dashboard": {
"title": f"Oracle AWR监控 - {metrics_history[0].get('db_name', 'Unknown')}",
"panels": self._create_grafana_panels(metrics_history),
"time": {
"from": "now-7d",
"to": "now"
}
}
}
# 这里需要实现Grafana API调用
# 实际项目中可以使用grafana_api库
def _create_grafana_panels(self, metrics_history: List[Dict]) -> List[Dict]:
"""创建Grafana面板配置"""
panels = []
# 缓冲命中率面板
panels.append({
"title": "Buffer Hit Ratio",
"type": "graph",
"targets": [{
"expr": 'oracle_buffer_hit_ratio{job="oracle_awr"}',
"legendFormat": "{{instance}}"
}],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
})
# 物理读面板
panels.append({
"title": "Physical Reads per Second",
"type": "graph",
"targets": [{
"expr": 'oracle_physical_reads_per_sec{job="oracle_awr"}',
"legendFormat": "{{instance}}"
}],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
})
return panels
def send_to_elasticsearch(self, analysis_results: Dict, index_name: str = 'oracle-awr'):
"""发送分析结果到Elasticsearch"""
try:
from elasticsearch import Elasticsearch
es = Elasticsearch([self.elasticsearch_url])
# 准备文档
doc = {
'@timestamp': datetime.now().isoformat(),
'db_name': analysis_results.get('db_name'),
'instance_name': analysis_results.get('instance_name'),
'analysis': analysis_results,
'alerts': analysis_results.get('alerts', []),
'summary': analysis_results.get('summary', {})
}
# 索引文档
es.index(
index=index_name,
body=doc,
id=f"{analysis_results.get('db_name')}_{analysis_results.get('instance_name')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
self.logger.info("分析结果已发送到Elasticsearch")
except ImportError:
self.logger.warning("Elasticsearch客户端未安装")
except Exception as e:
self.logger.error(f"发送到Elasticsearch失败: {e}")
```
### 6.4 错误处理与日志记录
健壮的错误处理对于生产环境至关重要:
```python
import traceback
from contextlib import contextmanager
class RobustAWRProcessor:
"""健壮的AWR处理器,包含完善的错误处理"""
def __init__(self):
self.setup_error_handling()
def setup_error_handling(self):
"""设置错误处理"""
# 创建错误处理器
self.error_handlers = {
'parse_error': self._handle_parse_error,
'file_error': self._handle_file_error,
'network_error': self._handle_network_error,
'data_error': self._handle_data_error
}
@contextmanager
def safe_execution(self, operation_name: str):
"""安全执行上下文管理器"""
try:
yield
except FileNotFoundError as e:
self.logger.error(f"[{operation_name}] 文件未找到: {e}")
self.error_handlers['file_error'](e)
except PermissionError as e:
self.logger.error(f"[{operation_name}] 权限错误: {e}")
self.error_handlers['file_error'](e)
except ConnectionError as e:
self.logger.error(f"[{operation_name}] 连接错误: {e}")
self.error_handlers['network_error'](e)
except ValueError as e:
self.logger.error(f"[{operation_name}] 数据错误: {e}")
self.error_handlers['data_error'](e, operation_name)
except Exception as e:
self.logger.error(f"[{operation_name}] 未知错误: {e}")
self.logger.error(traceback.format_exc())
raise
def _handle_parse_error(self, error: Exception, report_content: str = None):
"""处理解析错误"""
self.logger.error(f"解析AWR报告失败: {error}")
# 记录原始内容用于调试
if report_content:
debug_file = Path(f"./debug/parse_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
debug_file.parent.mkdir(exist_ok=True)
with open(debug_file, 'w', encoding='utf-8') as f:
f.write(report_content[:5000]) # 只保存前5000字符
# 尝试使用备用解析方法
return self._fallback_parse(report_content) if report_content else None
def _handle_file_error(self, error: Exception):
"""处理文件错误"""
self.logger.error(f"文件操作失败: {error}")
# 可以尝试从备份位置读取
def _handle_network_error(self, error: Exception):
"""处理网络错误"""
self.logger.error(f"网络操作失败: {error}")
#