以下是对给定 Python 代码各步骤功能的详细分析:
### 导入必要的库
```python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, BoundaryNorm
from matplotlib.patches import Patch
```
- `pandas` 用于数据处理和分析,提供了 `DataFrame` 等数据结构,方便对表格数据进行操作。
- `numpy` 用于数值计算,提供了高效的多维数组对象和各种数学函数。
- `matplotlib.pyplot` 用于数据可视化,提供了类似于 MATLAB 的绘图接口。
- `ListedColormap` 和 `BoundaryNorm` 用于定义颜色映射和边界规范。
- `Patch` 用于创建图例中的补丁对象。
### 定义文件路径和绘图参数
```python
infile = '第三问_患者清单.xlsx'
outfile = '第三问_个体最优方案.xlsx'
plot_file_1 = '第三问_样本患者月度方案图.png'
plot_file_2 = '第三问_样本患者干预轨迹图.png'
plot_file_3 = '第三问_总体干预效果对比图.png'
plot_file_4 = '第三问_成本效果散点图.png'
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.facecolor'] = 'white'
plt.rcParams['axes.facecolor'] = 'white'
```
- 定义输入文件、输出文件和绘图文件的文件名。
- 设置 `matplotlib` 的绘图参数,包括字体、负号显示、图形背景颜色和坐标轴背景颜色。
### 定义颜色和常量
```python
PLAN_CMAP = ListedColormap(['#DCEAF7', '#7DB7E8', '#2E73B8'])
PLAN_NORM = BoundaryNorm([0.5, 1.5, 2.5, 3.5], PLAN_CMAP.N)
TRAJ_COLORS = ['#4E79A7', '#F28E2B', '#E15759']
BOX_COLORS = ['#C9D6DF', '#8FD694']
SCATTER_COLORS = {1: '#4E79A7', 2: '#F28E2B', 3: '#E15759'}
GRID_COLOR = '#D9E2EC'
TEXT_COLOR = '#2D3142'
ALPHA = 0.7
MONTHS = 6
GRADE_COST = {1: 30, 2: 80, 3: 130}
ACT_COST = {1: 3, 2: 5, 3: 8}
ETA = {1: 0.01, 2: 0.02, 3: 0.03}
```
- 定义绘图所需的颜色映射、颜色列表和颜色字典。
- 定义一些常量,包括权重系数 `ALPHA`、月份数 `MONTHS`、调理等级成本 `GRADE_COST`、活动强度成本 `ACT_COST` 和衰减系数 `ETA`。
### 定义辅助函数
```python
def grade_by_score(score):
if score <= 58:
return 1
elif score <= 61:
return 2
else:
return 3
def feasible_strengths(age_group, act_score):
age_set = {1,2,3} if age_group in [1,2] else ({1,2} if age_group in [3,4] else {1})
act_set = {1} if act_score < 40 else ({1,2} if act_score < 60 else {1,2,3})
return sorted(age_set & act_set)
def activity_drop(a, f):
if f < 5:
return 0.0
return 0.03 * (a - 1) + 0.01 * (f - 5)
def next_score(s, g, a, f):
drop = ETA[g] + activity_drop(a, f)
nxt = max(0.0, s * (1 - drop))
return int(round(nxt))
def month_cost(g, a, f):
return GRADE_COST[g] + 4 * f * ACT_COST[a]
```
- `grade_by_score`:根据痰湿积分返回对应的调理等级。
- `feasible_strengths`:根据年龄组和活动量表总分返回可行的活动强度集合。
- `activity_drop`:根据活动强度 `a` 和每周频次 `f` 计算活动下降率。
- `next_score`:根据当前痰湿积分 `s`、调理等级 `g`、活动强度 `a` 和每周频次 `f` 计算下一个月的痰湿积分。
- `month_cost`:根据调理等级 `g`、活动强度 `a` 和每周频次 `f` 计算当月的成本。
### 定义优化函数
```python
def optimize_one(row):
sid = int(row['样本ID'])
s0 = int(round(float(row['痰湿质'])))
age_group = int(row['年龄组'])
act_score = float(row['活动量表总分(ADL总分+IADL总分)'])
intensities = feasible_strengths(age_group, act_score)
dp = [dict() for _ in range(MONTHS + 1)]
dp[0][s0] = (0.0, 0.0, None, None)
for t in range(MONTHS):
for s, (val, cost, prev, action) in dp[t].items():
g = grade_by_score(s)
for a in intensities:
for f in range(1, 11):
c = month_cost(g, a, f)
new_cost = cost + c
if new_cost > 2000:
continue
s2 = next_score(s, g, a, f)
if s2 not in dp[t + 1]:
dp[t + 1][s2] = (0.0, new_cost, s, (g, a, f))
else:
_, old_cost, _, _ = dp[t + 1][s2]
if new_cost < old_cost:
dp[t + 1][s2] = (0.0, new_cost, s, (g, a, f))
best = None
for s6, (_, cost6, prev, action) in dp[MONTHS].items():
J = ALPHA * (s6 / max(s0, 1)) + (1 - ALPHA) * (cost6 / 2000)
item = (J, s6, cost6)
if best is None or item < best[:3]:
best = (J, s6, cost6)
if best is None:
return None, None
_, s6_best, cost_best = best
plan = []
cur_s = s6_best
for t in range(MONTHS, 0, -1):
_, c, prev_s, act = dp[t][cur_s]
g, a, f = act
plan.append((t, prev_s, g, a, f, cur_s, month_cost(g, a, f)))
cur_s = prev_s
plan.reverse()
summary = {
'样本ID': sid,
'初始痰湿积分': s0,
'年龄组': age_group,
'活动总分': act_score,
'可选活动强度': '、'.join(map(str, intensities)),
'6个月后痰湿积分': s6_best,
'总成本': cost_best,
'目标函数值J': round(ALPHA * (s6_best / max(s0, 1)) + (1 - ALPHA) * (cost_best / 2000), 6),
'推荐强度主模式': pd.Series([x[3] for x in plan]).mode().iloc[0],
'推荐频次中位数': int(pd.Series([x[4] for x in plan]).median())
}
detail = pd.DataFrame(plan, columns=['月份','月初痰湿积分','调理等级','活动强度','每周频次','月末痰湿积分','当月成本'])
detail.insert(0, '样本ID', sid)
return summary, detail
```
- 该函数用于为单个患者优化干预方案。使用动态规划算法,根据患者的初始信息计算最优的干预方案,并返回方案的总结信息和详细信息。
### 读取数据并处理
```python
pat = pd.read_excel(infile, sheet_name='痰湿体质患者清单')
summaries, details = [], []
for _, row in pat.iterrows():
s, d = optimize_one(row)
if s is not None:
summaries.append(s)
details.append(d)
summary_df = pd.DataFrame(summaries).sort_values('样本ID') if summaries else pd.DataFrame()
detail_df = pd.concat(details, ignore_index=True) if details else pd.DataFrame()
```
- 读取输入文件中的数据。
- 遍历患者数据,调用 `optimize_one` 函数为每个患者计算最优方案,并将总结信息和详细信息分别存储在列表中。
- 将总结信息和详细信息列表转换为 `DataFrame` 对象,并对总结信息按样本 ID 进行排序,将详细信息合并为一个 `DataFrame`。
### 选择样本患者
```python
if not detail_df.empty:
preferred_ids = [1, 2, 3]
available = summary_df['样本ID'].tolist()
sample_ids = [x for x in preferred_ids if x in available]
if len(sample_ids) < 3:
extra = [x for x in available if x not in sample_ids][:3-len(sample_ids)]
sample_ids += extra
sample_df = detail_df[detail_df['样本ID'].isin(sample_ids)].copy()
else:
sample_ids = []
sample_df = pd.DataFrame()
```
- 如果详细信息 `DataFrame` 不为空,选择样本患者的 ID,并筛选出这些患者的详细信息。
### 保存结果到 Excel 文件
```python
with pd.ExcelWriter(outfile) as w:
summary_df.to_excel(w, sheet_name='患者最优方案汇总', index=False)
detail_df.to_excel(w, sheet_name='全部患者月度方案', index=False)
sample_df.to_excel(w, sheet_name='样本方案', index=False)
```
- 使用 `pandas` 的 `ExcelWriter` 将总结信息、详细信息和样本信息写入输出文件的不同工作表中。
### 绘制样本患者月度方案图
```python
if not sample_df.empty:
pivot = sample_df.pivot(index='样本ID', columns='月份', values='活动强度').reindex(index=sample_ids)
fig, ax = plt.subplots(figsize=(10, max(3.5, len(sample_ids) * 1.2)))
im = ax.imshow(pivot.values, aspect='auto', cmap=PLAN_CMAP, norm=PLAN_NORM)
ax.set_xticks(range(len(pivot.columns)))
ax.set_xticklabels([f'{int(c)}月' for c in pivot.columns])
ax.set_yticks(range(len(pivot.index)))
ax.set_yticklabels([f'样本{int(i)}' for i in pivot.index])
ax.set_title('第三问:样本患者月度方案图', fontsize=13, fontweight='bold')
ax.set_xlabel('月份')
ax.set_ylabel('患者')
for i, sid in enumerate(pivot.index):
sub = sample_df[sample_df['样本ID'] == sid].sort_values('月份')
for j, (_, row) in enumerate(sub.iterrows()):
txt = f"{int(row['调理等级'])}/{int(row['活动强度'])}/{int(row['每周频次'])}"
txt_color = 'white' if int(row['活动强度']) >= 2 else TEXT_COLOR
ax.text(j, i, txt, ha='center', va='center', fontsize=8, color=txt_color)
legend_handles = [Patch(facecolor=PLAN_CMAP.colors[i-1], edgecolor='none', label=f'{i}级活动强度') for i in [1,2,3]]
ax.legend(handles=legend_handles, loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=3, frameon=False)
fig.tight_layout()
fig.savefig(plot_file_1, dpi=300, bbox_inches='tight')
plt.close(fig)
```
- 如果样本信息 `DataFrame` 不为空,绘制样本患者月度方案图,并保存为图片文件。
### 绘制样本患者干预轨迹图
```python
if not sample_df.empty:
fig, ax = plt.subplots(figsize=(9, 5))
for idx, sid in enumerate(sample_ids):
sub = sample_df[sample_df['样本ID'] == sid].sort_values('月份')
if sub.empty:
continue
x = [0] + sub['月份'].tolist()
y = [sub.iloc[0]['月初痰湿积分']] + sub['月末痰湿积分'].tolist()
color = TRAJ_COLORS[idx % len(TRAJ_COLORS)]
ax.plot(x, y, marker='o', linewidth=2.5, markersize=6, label=f'样本{sid}', color=color)
ax.set_xticks(range(0, MONTHS + 1))
ax.set_xlabel('月份')
ax.set_ylabel('痰湿积分')
ax.set_title('第三问:样本患者干预轨迹图', fontsize=13, fontweight='bold')
ax.grid(axis='y', linestyle='--', alpha=0.35, color=GRID_COLOR)
ax.legend(frameon=False)
fig.tight_layout()
fig.savefig(plot_file_2, dpi=300, bbox_inches='tight')
plt.close(fig)
```
- 如果样本信息 `DataFrame` 不为空,绘制样本患者干预轨迹图,并保存为图片文件。
### 绘制总体干预效果对比图
```python
if not summary_df.empty:
fig, ax = plt.subplots(figsize=(7, 5))
box = ax.boxplot(
[summary_df['初始痰湿积分'].dropna(), summary_df['6个月后痰湿积分'].dropna()],
labels=['干预前', '6个月后'], patch_artist=True,
medianprops=dict(color='#2D3142', linewidth=2),
whiskerprops=dict(color='#7B8794', linewidth=1.2),
capprops=dict(color='#7B8794', linewidth=1.2),
boxprops=dict(color='#7B8794', linewidth=1.2)
)
for patch, c in zip(box['boxes'], BOX_COLORS):
patch.set_facecolor(c)
patch.set_alpha(0.85)
means = [summary_df['初始痰湿积分'].mean(), summary_df['6个月后痰湿积分'].mean()]
ax.plot([1, 2], means, color='#355C7D', marker='D', linewidth=1.8, label='均值')
ax.set_ylabel('痰湿积分')
ax.set_title('第三问:总体干预效果对比', fontsize=13, fontweight='bold')
ax.grid(axis='y', linestyle='--', alpha=0.35, color=GRID_COLOR)
ax.legend(frameon=False)
fig.tight_layout()
fig.savefig(plot_file_3, dpi=300, bbox_inches='tight')
plt.close(fig)
```
- 如果总结信息 `DataFrame` 不为空,绘制总体干预效果对比图,并保存为图片文件。
### 绘制成本 - 效果散点图
```python
if not summary_df.empty:
tmp = summary_df.copy()
tmp['积分下降'] = tmp['初始痰湿积分'] - tmp['6个月后痰湿积分']
fig, ax = plt.subplots(figsize=(8, 5))
for strength in sorted(tmp['推荐强度主模式'].dropna().unique()):
sub = tmp[tmp['推荐强度主模式'] == strength]
ax.scatter(
sub['总成本'], sub['积分下降'],
label=f'{int(strength)}级强度',
color=SCATTER_COLORS.get(int(strength), '#4E79A7'),
s=55, alpha=0.85, edgecolors='white', linewidths=0.8
)
ax.set_xlabel('总成本')
ax.set_ylabel('痰湿积分下降')
ax.set_title('第三问:成本 - 效果散点图', fontsize=13, fontweight='bold')
ax.grid(linestyle='--', alpha=0.35, color=GRID_COLOR)
ax.legend(frameon=False)
fig.tight_layout()
fig.savefig(plot_file_4, dpi=300, bbox_inches='tight')
plt.close(fig)
```
- 如果总结信息 `DataFrame` 不为空,绘制成本 - 效果散点图,并保存为图片