# 线性规划对偶问题实战:从零推导到Python代码实现
在优化领域,线性规划的对偶理论堪称一块瑰宝。它不仅仅是数学上的优雅对称,更是解决实际问题的强大工具。想象一下,当你面对一个复杂的资源分配问题,直接求解原问题可能计算量巨大,但通过巧妙的转换,对偶问题却能提供更高效的求解路径。这种“换个角度看世界”的智慧,正是对偶思想的精髓所在。
我最初接触对偶概念时,总觉得它像魔术一样神秘。直到亲手用代码实现了几次转换和求解,才真正体会到其中的妙处。对偶不仅仅是理论上的对应关系,它在算法设计、灵敏度分析、甚至经济学解释中都有深刻的应用。本文将带你从最基础的原理出发,一步步推导对偶问题的构建方法,最后用Python代码实现完整的求解流程,让你不仅能理解理论,更能动手实践。
## 1. 对偶问题的直观理解与数学基础
### 1.1 为什么要研究对偶?
线性规划的对偶问题并非数学家的智力游戏,而是有着深刻的实用价值。在实际应用中,对偶理论至少在三方面发挥着关键作用:
1. **计算效率提升**:某些情况下,对偶问题的规模更小或结构更简单,求解速度更快
2. **灵敏度分析**:对偶变量(影子价格)能告诉你约束条件的"价值",这在资源定价中至关重要
3. **理论保证**:强对偶定理确保在适当条件下,原问题与对偶问题的最优值相等
> 提示:对偶变量在经济学中被称为"影子价格",它表示增加一单位资源约束所能带来的目标函数改进量。这个解释让抽象的数学概念变得直观有用。
### 1.2 从简单例子看对偶转换
让我们从一个最简单的线性规划问题开始:
```
最小化: 3x₁ + 4x₂
约束条件:
2x₁ + x₂ ≥ 6
x₁ + 3x₂ ≥ 9
x₁, x₂ ≥ 0
```
这个问题的对偶形式是什么?按照标准转换规则,我们可以直接写出:
```
最大化: 6y₁ + 9y₂
约束条件:
2y₁ + y₂ ≤ 3
y₁ + 3y₂ ≤ 4
y₁, y₂ ≥ 0
```
但这样的机械转换缺乏直观理解。让我们换个角度思考:原问题中,我们试图用最小成本满足资源需求;对偶问题中,我们则试图给资源定价,使得定价总和最大,但每种产品的"虚拟成本"不能超过实际成本。
### 1.3 对偶转换的一般形式
对于标准形式的线性规划问题:
**原问题(Primal)**:
```
最小化 cᵀx
约束条件:Ax ≥ b, x ≥ 0
```
**对偶问题(Dual)**:
```
最大化 bᵀy
约束条件:Aᵀy ≤ c, y ≥ 0
```
这种对应关系可以通过拉格朗日函数自然推导出来。考虑拉格朗日函数:
```
L(x, y) = cᵀx + yᵀ(b - Ax)
```
其中y ≥ 0是对偶变量。对于任意可行解x,我们有:
```
cᵀx ≥ L(x, y) = cᵀx + yᵀ(b - Ax) ≥ yᵀb
```
因此yᵀb是原问题目标函数的下界。最大化这个下界就得到了对偶问题。
## 2. 对偶问题的构建:从理论到算法
### 2.1 系统化的构建方法
对于更一般的线性规划形式,我们需要一个系统化的构建方法。考虑混合形式的原问题:
```
最小化 c₁ᵀx₁ + c₂ᵀx₂ + c₃ᵀx₃
约束条件:
A₁₁x₁ + A₁₂x₂ + A₁₃x₃ ≥ b₁
A₂₁x₁ + A₂₂x₂ + A₂₃x₃ = b₂
A₃₁x₁ + A₃₂x₂ + A₃₃x₃ ≤ b₃
x₁ ≥ 0, x₂自由, x₃ ≤ 0
```
构建对偶问题的规则可以总结为下表:
| 原问题特征 | 对偶问题对应规则 |
|------------|------------------|
| 最小化目标 | 最大化目标 |
| 第i个约束为"≥" | 对偶变量yᵢ ≥ 0 |
| 第i个约束为"=" | 对偶变量yᵢ自由 |
| 第j个变量xⱼ ≥ 0 | 第j个约束为"≤" |
| 第j个变量xⱼ自由 | 第j个约束为"=" |
| 第j个变量xⱼ ≤ 0 | 第j个约束为"≥" |
基于这些规则,上述问题的对偶为:
```
最大化 b₁ᵀy₁ + b₂ᵀy₂ + b₃ᵀy₃
约束条件:
A₁₁ᵀy₁ + A₂₁ᵀy₂ + A₃₁ᵀy₃ ≤ c₁
A₁₂ᵀy₁ + A₂₂ᵀy₂ + A₃₂ᵀy₃ = c₂
A₁₃ᵀy₁ + A₂₃ᵀy₂ + A₃₃ᵀy₃ ≥ c₃
y₁ ≥ 0, y₂自由, y₃ ≤ 0
```
### 2.2 Python实现:自动化对偶转换
理解了理论规则后,我们可以用Python实现自动化的对偶转换。下面是一个通用的转换函数:
```python
import numpy as np
from enum import Enum
class ConstraintType(Enum):
GEQ = ">=" # 大于等于
EQ = "=" # 等于
LEQ = "<=" # 小于等于
class VariableType(Enum):
NONNEGATIVE = ">=0" # 非负
FREE = "free" # 自由
NONPOSITIVE = "<=0" # 非正
def build_dual_primal(c, A, b, constraint_types, variable_types):
"""
构建线性规划问题的对偶形式
参数:
c: 目标函数系数向量 (n维)
A: 约束矩阵 (m×n)
b: 约束右侧向量 (m维)
constraint_types: 约束类型列表,长度为m
variable_types: 变量类型列表,长度为n
返回:
dual_c, dual_A, dual_b, dual_constraint_types, dual_variable_types
"""
m, n = A.shape
# 对偶问题的目标函数系数 = 原问题的约束右侧
dual_c = b.copy()
# 对偶问题的约束矩阵 = 原问题约束矩阵的转置
dual_A = A.T.copy()
# 对偶问题的约束右侧 = 原问题的目标函数系数
dual_b = c.copy()
# 确定对偶变量的类型(基于原问题约束类型)
dual_variable_types = []
for constr_type in constraint_types:
if constr_type == ConstraintType.GEQ:
dual_variable_types.append(VariableType.NONNEGATIVE)
elif constr_type == ConstraintType.EQ:
dual_variable_types.append(VariableType.FREE)
elif constr_type == ConstraintType.LEQ:
dual_variable_types.append(VariableType.NONPOSITIVE)
# 确定对偶约束的类型(基于原问题变量类型)
dual_constraint_types = []
for var_type in variable_types:
if var_type == VariableType.NONNEGATIVE:
dual_constraint_types.append(ConstraintType.LEQ)
elif var_type == VariableType.FREE:
dual_constraint_types.append(ConstraintType.EQ)
elif var_type == VariableType.NONPOSITIVE:
dual_constraint_types.append(ConstraintType.GEQ)
return dual_c, dual_A, dual_b, dual_constraint_types, dual_variable_types
# 示例:构建一个具体问题的对偶
def example_dual_conversion():
# 原问题:最小化 3x1 + 4x2 + 5x3
# 约束:2x1 + x2 + 3x3 >= 10
# x1 + 2x2 + x3 = 8
# x1, x2 >= 0, x3自由
c = np.array([3, 4, 5])
A = np.array([[2, 1, 3],
[1, 2, 1]])
b = np.array([10, 8])
constraint_types = [ConstraintType.GEQ, ConstraintType.EQ]
variable_types = [VariableType.NONNEGATIVE,
VariableType.NONNEGATIVE,
VariableType.FREE]
dual_c, dual_A, dual_b, dual_constr_types, dual_var_types = \
build_dual_primal(c, A, b, constraint_types, variable_types)
print("原问题:")
print(f"最小化: {c}·x")
print(f"约束: A = \n{A}")
print(f" b = {b}")
print(f"约束类型: {[t.value for t in constraint_types]}")
print(f"变量类型: {[t.value for t in variable_types]}")
print("\n对偶问题:")
print(f"最大化: {dual_c}·y")
print(f"约束: A' = \n{dual_A}")
print(f" b' = {dual_b}")
print(f"约束类型: {[t.value for t in dual_constr_types]}")
print(f"变量类型: {[t.value for t in dual_var_types]}")
if __name__ == "__main__":
example_dual_conversion()
```
这个实现虽然基础,但清晰地展示了对偶转换的规则。在实际应用中,我们还需要处理更复杂的情况,比如有界变量、混合整数规划等。
## 3. 强对偶定理与互补松弛条件
### 3.1 强对偶定理的核心思想
强对偶定理是线性规划理论中最优美的结果之一。它指出:如果原问题和对偶问题都有可行解,那么它们都有最优解,并且最优值相等。用数学语言表达:
**定理**:对于线性规划问题,如果原问题和对偶问题都可行,则存在最优解x*和y*,使得:
```
cᵀx* = bᵀy*
```
这个定理的证明通常基于单纯形法的收敛性,或者更一般地,基于凸分析中的分离定理。从几何角度看,强对偶性意味着原问题和对偶问题的最优解在某种意义下"对齐"了。
> 注意:强对偶性成立需要一定的条件,最常见的是Slater条件——存在严格可行的内点。对于线性规划,只要原问题和对偶问题都可行,强对偶性就一定成立。
### 3.2 互补松弛条件:连接原问题与对偶问题的桥梁
互补松弛条件是对偶理论中另一个关键概念。它描述了原问题最优解和对偶问题最优解之间的关系:
**互补松弛条件**:设x*是原问题的最优解,y*是对偶问题的最优解,则对于每个约束,都有:
1. 如果原问题的第i个约束在最优解处是严格不等式(松弛的),那么对应的对偶变量yᵢ* = 0
2. 如果对偶问题的第j个约束在最优解处是严格不等式,那么对应的原变量xⱼ* = 0
用数学公式表示:
```
yᵢ*(bᵢ - Aᵢx*) = 0, i = 1,...,m
xⱼ*(cⱼ - Aⱼᵀy*) = 0, j = 1,...,n
```
这些条件在算法设计和灵敏度分析中非常有用。例如,在单纯形法中,我们可以利用互补松弛条件来验证解的最优性。
### 3.3 Python实现:验证强对偶与互补松弛
让我们用代码验证这些理论性质:
```python
import numpy as np
from scipy.optimize import linprog
def verify_strong_duality_and_complementarity():
"""
验证强对偶定理和互补松弛条件
"""
# 定义一个简单的线性规划问题
# 最小化: 3x1 + 4x2
# 约束: 2x1 + x2 >= 6
# x1 + 3x2 >= 9
# x1, x2 >= 0
# 原问题(转换为标准形式)
c_primal = np.array([3, 4])
A_primal = np.array([[-2, -1], # 转换为 <= 形式
[-1, -3]])
b_primal = np.array([-6, -9]) # 注意符号变化
bounds_primal = [(0, None), (0, None)]
# 求解原问题
res_primal = linprog(c_primal, A_ub=A_primal, b_ub=b_primal,
bounds=bounds_primal, method='highs')
if not res_primal.success:
print("原问题无可行解或无界")
return
x_opt = res_primal.x
primal_opt = res_primal.fun
print(f"原问题最优解: x = {x_opt}")
print(f"原问题最优值: {primal_opt}")
# 对偶问题
# 最大化: 6y1 + 9y2
# 约束: 2y1 + y2 <= 3
# y1 + 3y2 <= 4
# y1, y2 >= 0
# 注意:linprog默认求解最小化问题,所以我们对目标取负
c_dual = np.array([-6, -9]) # 取负以实现最大化
A_dual = np.array([[2, 1],
[1, 3]])
b_dual = np.array([3, 4])
bounds_dual = [(0, None), (0, None)]
# 求解对偶问题
res_dual = linprog(c_dual, A_ub=A_dual, b_ub=b_dual,
bounds=bounds_dual, method='highs')
if not res_dual.success:
print("对偶问题无可行解或无界")
return
y_opt = res_dual.x
dual_opt = -res_dual.fun # 恢复原始目标值
print(f"\n对偶问题最优解: y = {y_opt}")
print(f"对偶问题最优值: {dual_opt}")
# 验证强对偶性
print(f"\n强对偶性验证:")
print(f"原问题最优值: {primal_opt}")
print(f"对偶问题最优值: {dual_opt}")
print(f"差值: {abs(primal_opt - dual_opt)}")
if np.allclose(primal_opt, dual_opt):
print("✓ 强对偶性成立:原问题和对偶问题最优值相等")
else:
print("✗ 强对偶性不成立")
# 验证互补松弛条件
print(f"\n互补松弛条件验证:")
# 原问题约束的松弛量
slack_primal = b_primal - A_primal @ x_opt
print(f"原问题约束松弛量: {slack_primal}")
# 对偶问题约束的松弛量
slack_dual = b_dual - A_dual @ y_opt
print(f"对偶问题约束松弛量: {slack_dual}")
# 检查互补松弛条件
complementarity_errors = []
for i in range(len(slack_primal)):
product = y_opt[i] * slack_primal[i]
complementarity_errors.append(abs(product))
print(f"约束{i}: y{i} * 松弛量{i} = {y_opt[i]:.6f} * {slack_primal[i]:.6f} = {product:.6f}")
for j in range(len(slack_dual)):
product = x_opt[j] * slack_dual[j]
complementarity_errors.append(abs(product))
print(f"变量{j}: x{j} * 对偶松弛量{j} = {x_opt[j]:.6f} * {slack_dual[j]:.6f} = {product:.6f}")
max_error = max(complementarity_errors)
print(f"\n最大互补松弛误差: {max_error:.10f}")
if max_error < 1e-6:
print("✓ 互补松弛条件满足")
else:
print("✗ 互补松弛条件不满足")
if __name__ == "__main__":
verify_strong_duality_and_complementarity()
```
运行这段代码,你会看到原问题和对偶问题的最优值确实相等,并且互补松弛条件得到满足。这种验证不仅加深了对理论的理解,也为我们后续设计算法提供了信心。
## 4. 网络流问题的对偶:最大流最小割定理
### 4.1 最大流问题的对偶形式
网络流问题是线性规划对偶理论的一个经典应用。考虑一个有向图G=(V,E),源点s,汇点t,每条边(i,j)有容量cᵢⱼ。最大流问题可以表述为:
```
最大化:从s到t的流量
约束条件:
流量守恒:每个节点(除s,t外)的流入等于流出
容量限制:0 ≤ fᵢⱼ ≤ cᵢⱼ 对所有边(i,j)
```
这个问题的对偶恰好是最小割问题。最小割问题是:找到将节点分成两个集合S和T(s∈S, t∈T)的分割,使得从S到T的边的容量之和最小。
**最大流最小割定理**:任何网络中的最大流值等于最小割的容量。
这个定理不仅优美,而且实用。它告诉我们,寻找最大流和寻找最小割本质上是同一个问题的两个方面。
### 4.2 Python实现:最大流问题及其对偶
让我们用Python实现最大流问题的求解,并验证最大流最小割定理:
```python
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import networkx as nx
class MaxFlowMinCut:
"""最大流最小割问题的实现"""
def __init__(self, n, source, sink):
"""
初始化网络
参数:
n: 节点数量
source: 源点索引
sink: 汇点索引
"""
self.n = n
self.source = source
self.sink = sink
self.capacity = np.zeros((n, n), dtype=float)
self.adj = [[] for _ in range(n)]
def add_edge(self, u, v, cap):
"""添加有向边"""
self.capacity[u][v] = cap
self.adj[u].append(v)
self.adj[v].append(u) # 用于反向边
def bfs(self, parent):
"""广度优先搜索寻找增广路径"""
visited = [False] * self.n
queue = deque([self.source])
visited[self.source] = True
while queue:
u = queue.popleft()
for v in self.adj[u]:
if not visited[v] and self.capacity[u][v] > 0:
queue.append(v)
visited[v] = True
parent[v] = u
if v == self.sink:
return True
return False
def edmonds_karp(self):
"""Edmonds-Karp算法求解最大流"""
parent = [-1] * self.n
max_flow = 0
# 复制容量矩阵,避免修改原始数据
capacity_copy = self.capacity.copy()
# 不断寻找增广路径
while self.bfs(parent):
# 找到增广路径上的最小剩余容量
path_flow = float('inf')
v = self.sink
while v != self.source:
u = parent[v]
path_flow = min(path_flow, capacity_copy[u][v])
v = u
# 更新剩余网络
v = self.sink
while v != self.source:
u = parent[v]
capacity_copy[u][v] -= path_flow
capacity_copy[v][u] += path_flow # 添加反向边
v = u
max_flow += path_flow
# 恢复原始容量矩阵
self.flow = self.capacity - capacity_copy
return max_flow
def find_min_cut(self):
"""基于最大流找到最小割"""
# 使用BFS在剩余网络中寻找从源点可达的节点
visited = [False] * self.n
queue = deque([self.source])
visited[self.source] = True
while queue:
u = queue.popleft()
for v in self.adj[u]:
if not visited[v] and self.capacity[u][v] - self.flow[u][v] > 0:
visited[v] = True
queue.append(v)
# visited为True的节点在S集合中,其余在T集合中
S = [i for i in range(self.n) if visited[i]]
T = [i for i in range(self.n) if not visited[i]]
# 计算割的容量
cut_capacity = 0
cut_edges = []
for u in S:
for v in T:
if self.capacity[u][v] > 0:
cut_capacity += self.capacity[u][v]
cut_edges.append((u, v))
return S, T, cut_edges, cut_capacity
def solve_and_verify(self):
"""求解并验证最大流最小割定理"""
print("求解最大流问题...")
max_flow = self.edmonds_karp()
print(f"最大流值: {max_flow}")
print("\n寻找最小割...")
S, T, cut_edges, min_cut = self.find_min_cut()
print(f"最小割容量: {min_cut}")
print(f"S集合(包含源点): {S}")
print(f"T集合(包含汇点): {T}")
print(f"割边: {cut_edges}")
# 验证定理
print(f"\n验证最大流最小割定理:")
print(f"最大流值 = {max_flow}")
print(f"最小割容量 = {min_cut}")
if np.allclose(max_flow, min_cut):
print("✓ 定理成立:最大流等于最小割")
else:
print("✗ 定理不成立")
return max_flow, min_cut, S, T, cut_edges
def visualize_network(self, S=None, T=None, cut_edges=None):
"""可视化网络和割"""
G = nx.DiGraph()
# 添加节点
for i in range(self.n):
G.add_node(i)
# 添加边
edge_labels = {}
for i in range(self.n):
for j in range(self.n):
if self.capacity[i][j] > 0:
G.add_edge(i, j)
edge_labels[(i, j)] = f"{self.flow[i][j]:.1f}/{self.capacity[i][j]:.1f}"
# 设置布局
pos = nx.spring_layout(G, seed=42)
plt.figure(figsize=(12, 8))
# 绘制节点
node_colors = []
if S is not None and T is not None:
for node in G.nodes():
if node in S:
node_colors.append('lightblue')
else:
node_colors.append('lightcoral')
else:
node_colors = ['lightgray'] * self.n
nx.draw_networkx_nodes(G, pos, node_color=node_colors,
node_size=500, alpha=0.8)
# 绘制边
edge_colors = []
edge_widths = []
if cut_edges is not None:
for edge in G.edges():
if edge in cut_edges or (edge[1], edge[0]) in cut_edges:
edge_colors.append('red')
edge_widths.append(3)
else:
edge_colors.append('gray')
edge_widths.append(1)
else:
edge_colors = ['gray'] * len(G.edges())
edge_widths = [1] * len(G.edges())
nx.draw_networkx_edges(G, pos, edge_color=edge_colors,
width=edge_widths, alpha=0.7,
arrows=True, arrowsize=20)
# 添加标签
nx.draw_networkx_labels(G, pos, font_size=12, font_weight='bold')
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels,
font_size=10)
# 标记源点和汇点
plt.text(pos[self.source][0], pos[self.source][1]+0.1,
'源点', fontsize=14, ha='center', color='darkblue')
plt.text(pos[self.sink][0], pos[self.sink][1]+0.1,
'汇点', fontsize=14, ha='center', color='darkred')
plt.title("网络流与最小割", fontsize=16)
plt.axis('off')
plt.tight_layout()
plt.show()
# 创建并求解一个网络流实例
def network_flow_example():
"""网络流问题示例"""
# 创建一个有6个节点的网络
n = 6
source = 0
sink = 5
mf = MaxFlowMinCut(n, source, sink)
# 添加边(u, v, 容量)
edges = [
(0, 1, 16), (0, 2, 13),
(1, 2, 10), (1, 3, 12),
(2, 1, 4), (2, 4, 14),
(3, 2, 9), (3, 5, 20),
(4, 3, 7), (4, 5, 4)
]
for u, v, cap in edges:
mf.add_edge(u, v, cap)
# 求解并验证
max_flow, min_cut, S, T, cut_edges = mf.solve_and_verify()
# 可视化
mf.visualize_network(S, T, cut_edges)
return mf, max_flow, min_cut
if __name__ == "__main__":
mf, max_flow, min_cut = network_flow_example()
```
这段代码实现了Edmonds-Karp算法求解最大流,并基于最大流找到最小割。可视化部分清晰地展示了网络结构、流量分配以及最小割的位置。
### 4.3 对偶视角下的网络流分析
从对偶的角度看,最大流问题的对偶变量可以解释为节点的"势能"或"价格"。对偶问题的约束条件:
```
πᵢ - πⱼ ≤ cᵢⱼ 对所有边(i,j)
πₛ = 1, πₜ = 0
πᵢ ≥ 0
```
其中πᵢ表示节点i的势能。对偶问题的目标是最大化从源点到汇点的势能差,这正好对应于最小割的容量。
这种对偶解释不仅提供了理论洞察,还启发了新的算法设计。例如,最短路增广算法就可以从对偶变量的角度来理解:每次迭代中,我们沿着最短路径增广流量,同时更新节点的势能(对偶变量)。
## 5. 对偶单纯形法:从理论到高效实现
### 5.1 对偶单纯形法的基本思想
传统单纯形法在原问题的可行域内移动,而对偶单纯形法在对偶问题的可行域内移动。这种方法特别适合以下情况:
1. 初始基解对偶可行但原始不可行
2. 添加新约束后重新优化
3. 整数规划中的分支定界法
对偶单纯形法的关键优势在于,它可以从一个对偶可行但原始不可行的解开始,通过迭代逐步达到原始可行性和对偶最优性。
算法的基本步骤:
1. **初始化**:找到一个对偶可行的基解(检验数非负)
2. **选择离基变量**:选择原始不可行的基变量(bᵢ < 0)
3. **选择进基变量**:根据最小比值测试选择进基变量
4. **旋转**:执行基变换
5. **迭代**:重复直到原始可行
### 5.2 Python实现:完整的对偶单纯形法
下面是一个完整的对偶单纯形法实现,包含详细的注释和错误处理:
```python
import numpy as np
from typing import Tuple, Optional, List
class DualSimplexSolver:
"""对偶单纯形法求解器"""
def __init__(self, c: np.ndarray, A: np.ndarray, b: np.ndarray):
"""
初始化线性规划问题
参数:
c: 目标函数系数 (n维)
A: 约束矩阵 (m×n)
b: 约束右侧 (m维)
假设问题形式为: 最小化 cᵀx, 约束 Ax = b, x ≥ 0
"""
self.c = c.astype(float)
self.A = A.astype(float)
self.b = b.astype(float)
self.m, self.n = A.shape
self.basis = None # 基变量索引
self.non_basis = None # 非基变量索引
self.tableau = None # 单纯形表
def _initialize_tableau(self):
"""初始化单纯形表"""
# 添加人工变量构造初始基
eye_m = np.eye(self.m)
self.A_aug = np.hstack([self.A, eye_m])
self.c_aug = np.hstack([self.c, np.zeros(self.m)])
# 初始基变量为人工变量
self.basis = list(range(self.n, self.n + self.m))
self.non_basis = list(range(self.n))
# 构建单纯形表
self.tableau = np.zeros((self.m + 1, self.n + self.m + 1))
# 约束部分
self.tableau[:self.m, :self.n] = self.A
self.tableau[:self.m, self.n:self.n+self.m] = eye_m
self.tableau[:self.m, -1] = self.b
# 目标函数行
self.tableau[-1, :self.n] = -self.c
self.tableau[-1, -1] = 0 # 目标函数值
# 将对偶单纯形表转换为对偶可行形式
self._make_dual_feasible()
def _make_dual_feasible(self):
"""确保单纯形表对偶可行(检验数非负)"""
# 检查检验数(最后一行,除了最后一列)
reduced_costs = self.tableau[-1, :-1]
# 如果存在负的检验数,需要调整
if np.any(reduced_costs < -1e-10):
# 这里使用两阶段法或大M法处理
# 简化处理:将所有检验数调整为非负
min_cost = np.min(reduced_costs)
if min_cost < 0:
# 添加一个足够大的常数到目标函数
adjustment = -min_cost + 1
self.tableau[-1, :-1] += adjustment
def _select_leaving_variable(self) -> Optional[int]:
"""选择离基变量(原始不可行的基变量)"""
# 查找b列中的负值(最后一列,除了最后一行)
b_col = self.tableau[:-1, -1]
# 找到最小的负值(最不可行的)
min_b_idx = np.argmin(b_col)
min_b_value = b_col[min_b_idx]
if min_b_value >= -1e-10: # 原始可行
return None
return min_b_idx
def _select_entering_variable(self, leaving_idx: int) -> Optional[int]:
"""选择进基变量"""
# 获取离基变量所在行(除了最后一列)
row = self.tableau[leaving_idx, :-1]
# 只考虑负的系数(对偶单纯形法的规则)
negative_mask = row < -1e-10
if not np.any(negative_mask):
# 无界问题
return None
# 计算比值:检验数 / 系数(只考虑负系数)
reduced_costs = self.tableau[-1, :-1]
ratios = np.full(self.n + self.m, np.inf)
# 只对负系数计算比值
ratios[negative_mask] = -reduced_costs[negative_mask] / row[negative_mask]
# 选择最小非负比值对应的变量
# 注意:这里要排除无穷大的比值
finite_mask = np.isfinite(ratios)
if not np.any(finite_mask):
return None
entering_idx = np.argmin(ratios[finite_mask])
# 映射回原始索引
valid_indices = np.where(finite_mask)[0]
return valid_indices[entering_idx]
def _pivot(self, leaving_idx: int, entering_idx: int):
"""执行旋转操作"""
# 获取主元
pivot = self.tableau[leaving_idx, entering_idx]
# 更新离基变量所在行
self.tableau[leaving_idx, :] /= pivot
# 更新其他行
for i in range(self.m + 1):
if i != leaving_idx:
factor = self.tableau[i, entering_idx]
self.tableau[i, :] -= factor * self.tableau[leaving_idx, :]
# 更新基变量索引
leaving_basis_var = self.basis[leaving_idx]
self.basis[leaving_idx] = entering_idx
# 更新非基变量索引
if entering_idx in self.non_basis:
self.non_basis.remove(entering_idx)
self.non_basis.append(leaving_basis_var)
def _get_solution(self) -> Tuple[np.ndarray, float]:
"""从单纯形表中提取解"""
x = np.zeros(self.n)
# 提取基变量的值
for i, var_idx in enumerate(self.basis):
if var_idx < self.n: # 只考虑原始变量
x[var_idx] = self.tableau[i, -1]
# 目标函数值(取负号,因为表中存储的是 -z)
obj_value = -self.tableau[-1, -1]
return x, obj_value
def solve(self, max_iterations: int = 1000) -> Tuple[Optional[np.ndarray], Optional[float], bool]:
"""求解线性规划问题"""
self._initialize_tableau()
for iteration in range(max_iterations):
# 检查原始可行性
b_col = self.tableau[:-1, -1]
if np.all(b_col >= -1e-10):
# 原始可行,达到最优
x, obj_value = self._get_solution()
return x, obj_value, True
# 选择离基变量
leaving_idx = self._select_leaving_variable()
if leaving_idx is None:
# 原始可行
x, obj_value = self._get_solution()
return x, obj_value, True
# 选择进基变量
entering_idx = self._select_entering_variable(leaving_idx)
if entering_idx is None:
# 问题无界或无解
return None, None, False
# 执行旋转
self._pivot(leaving_idx, entering_idx)
# 打印迭代信息(调试用)
if iteration % 100 == 0:
obj_val = -self.tableau[-1, -1]
print(f"迭代 {iteration}: 目标值 = {obj_val:.6f}")
print(f"达到最大迭代次数 {max_iterations}")
return None, None, False
def print_solution(self, x: np.ndarray, obj_value: float):
"""打印求解结果"""
print("\n" + "="*50)
print("对偶单纯形法求解结果")
print("="*50)
print(f"\n最优目标值: {obj_value:.6f}")
print("\n最优解:")
for i, val in enumerate(x):
print(f" x[{i}] = {val:.6f}")
# 验证约束
print("\n约束验证:")
residuals = self.A @ x - self.b
max_residual = np.max(np.abs(residuals))
print(f" 最大约束残差: {max_residual:.2e}")
# 验证对偶可行性(检验数非负)
if hasattr(self, 'basis'):
print("\n对偶可行性验证:")
reduced_costs = self.tableau[-1, :-1]
min_reduced_cost = np.min(reduced_costs)
print(f" 最小检验数: {min_reduced_cost:.2e}")
if min_reduced_cost >= -1e-10:
print(" ✓ 对偶可行")
else:
print(" ✗ 对偶不可行")
# 测试对偶单纯形法
def test_dual_simplex():
"""测试对偶单纯形法"""
print("测试1: 标准线性规划问题")
print("-" * 40)
# 问题: 最小化 -x1 - 2x2
# 约束: x1 + x2 <= 4
# x1 - x2 <= 2
# x1, x2 >= 0
# 转换为标准形式: 添加松弛变量
c = np.array([-1, -2, 0, 0]) # 原目标,添加松弛变量
A = np.array([[1, 1, 1, 0], # 第一个约束
[1, -1, 0, 1]]) # 第二个约束
b = np.array([4, 2])
solver = DualSimplexSolver(c, A, b)
x_opt, obj_val, success = solver.solve()
if success:
solver.print_solution(x_opt[:2], obj_val) # 只显示原始变量
else:
print("求解失败")
print("\n" + "="*50)
print("测试2: 原始不可行但对偶可行的问题")
print("-" * 40)
# 这个问题初始解原始不可行但对偶可行
c = np.array([2, 3, 0, 0])
A = np.array([[-1, -1, 1, 0], # -x1 - x2 + s1 = -1
[2, -1, 0, 1]]) # 2x1 - x2 + s2 = 4
b = np.array([-1, 4])
solver2 = DualSimplexSolver(c, A, b)
x_opt2, obj_val2, success2 = solver2.solve()
if success2:
solver2.print_solution(x_opt2[:2], obj_val2)
else:
print("求解失败")
if __name__ == "__main__":
test_dual_simplex()
```
这个实现展示了对偶单纯形法的核心逻辑。虽然为了清晰起见,代码中做了一些简化(比如使用两阶段法的简化版本),但它完整地展示了算法的关键步骤。
### 5.3 对偶单纯形法的实际应用场景
在实际应用中,对偶单纯形法特别适合以下情况:
**场景1:添加新约束后的重新优化**
当我们在已求解的线性规划问题中添加新约束时,原最优解可能不再可行,但对偶可行性通常保持。这时使用对偶单纯形法可以快速找到新的最优解,而不必从头开始。
**场景2:分支定界法中的节点处理**
在整数规划的分支定界法中,每个节点对应一个线性松弛问题。当添加分支约束后,父节点的解可能不再可行,但对偶单纯形法可以高效处理这种情况。
**场景3:大规模稀疏问题的求解**
对于某些结构特殊的大规模稀疏问题,对偶单纯形法可能比原始单纯形法更高效,因为它可以利用问题的对偶结构。
## 6. 对偶理论在机器学习中的应用:支持向量机
### 6.1 支持向量机的对偶形式
支持向量机(SVM)是对偶理论在机器学习中的经典应用。原始SVM问题可以表述为:
```
最小化: 1/2 ||w||² + C∑ξᵢ
约束条件: yᵢ(wᵀxᵢ + b) ≥ 1 - ξᵢ
ξᵢ ≥ 0
```
通过拉格朗日对偶,我们可以得到对偶问题:
```
最大化: ∑αᵢ - 1/2 ∑∑αᵢαⱼyᵢyⱼxᵢᵀxⱼ
约束条件: 0 ≤ αᵢ ≤ C
∑αᵢyᵢ = 0
```
这个对偶形式有几个重要优势:
1. 只涉及样本之间的内积,便于核技巧的应用
2. 问题规模由特征维度变为样本数量,适合特征维度高的情况
3. 支持向量的识别更加直观(αᵢ > 0的样本)
### 6.2 Python实现:SVM的对偶求解
下面是一个简化的SVM对偶问题求解实现:
```python
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from cvxopt import matrix, solvers
import matplotlib.pyplot as plt
class DualSVM:
"""对偶形式SVM的实现"""
def __init__(self, C=1.0, kernel='linear'):
"""
初始化SVM
参数:
C: 正则化参数
kernel: 核函数类型,目前只支持'linear'
"""
self.C = C
self.kernel = kernel
self.alpha = None
self.b = None
self.support_vectors = None
self.support_labels = None
def _linear_kernel(self, X1, X2):
"""线性核函数"""
return X1 @ X2.T
def fit(self, X, y):
"""
训练SVM(求解对偶问题)
参数:
X: 训练特征,形状 (n_samples, n_features)
y: 训练标签,形状 (n_samples,),取值为±1
"""
n_samples, n_features = X.shape
# 构建对偶问题的二次规划形式
# 最小化: 1/2 αᵀPα - 1ᵀα
# 约束: yᵀα = 0, 0 ≤ αᵢ ≤ C
# 核矩阵
if self.kernel == 'linear':
K = self._linear_kernel(X, X)
else:
raise ValueError("目前只支持线性核")
# 二次项矩阵 P = yᵢyⱼK(xᵢ, xⱼ)
y_reshaped = y.reshape(-1, 1)
P = matrix(np.outer(y, y) * K)
# 线性项 q = -1向量
q = matrix(-np.ones(n_samples))
# 等式约束: yᵀα = 0
A = matrix(y.reshape(1, -1).astype(float))
b = matrix(0.0)
# 不等式约束: 0 ≤ αᵢ ≤ C
# 等价于: αᵢ ≥ 0 和 αᵢ ≤ C
G = matrix(np.vstack([-np.eye(n_samples), np.eye(n_samples)]))
h = matrix(np.hstack([np.zeros(n_samples), np.ones(n_samples) * self.C]))
# 求解二次规划
solvers.options['show_progress'] = False
solution = solvers.qp(P, q, G, h, A, b)
# 提取解
self.alpha = np.array(solution['x']).flatten()
# 找到支持向量 (α > 0)
sv_indices = self.alpha > 1e-5
self.support_vectors = X[sv_indices]
self.support_labels = y[sv_indices]
self.alpha_sv = self.alpha[sv_indices]
# 计算偏置b
self.b = 0
for i in range(len(self.alpha_sv)):
self.b += self.support_labels[i]
self.b -= np.sum(self.alpha_sv * self.support_labels *
K[sv_indices][i, sv_indices])
self.b /= len(self.alpha_sv)
# 计算权重向量w(仅线性核)
if self.kernel == 'linear':
self.w = np.zeros(n_features)
for i in range(len(self.alpha_sv)):
self.w += self.alpha_sv[i] * self.support_labels[i] * self.support_vectors[i]
else:
self.w = None
def predict(self, X):
"""预测"""
if self.kernel == 'linear':
scores = X @ self.w + self.b
else:
# 对于非线性核,需要计算核函数
scores = np.zeros(X.shape[0])
for i in range(X.shape[0]):
for j in range(len(self.support_vectors)):
scores[i] += (self.alpha_sv[j] * self.support_labels[j] *
self._linear_kernel(X[i:i+1], self.support_vectors[j:j+1]))
scores += self.b
return np.sign(scores)
def visualize(self, X, y, X_test=None, y_test=None):
"""可视化SVM决策边界(仅适用于二维特征)"""
if X.shape[1] != 2:
print("可视化仅支持二维特征")
return
plt.figure(figsize=(12, 8))
# 创建网格
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.02),
np.arange(y_min, y_max, 0.02))
# 预测网格点的类别
Z = self.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
# 绘制决策边界和间隔
plt.contourf(xx, yy, Z, alpha=0.3, cmap=plt.cm.coolwarm)
plt.contour(xx, yy, Z, colors='k', linewidths=0.5, linestyles='solid')
# 绘制训练数据
plt.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.coolwarm,
edgecolors='k', s=50, alpha=0.7, label='训练数据')
# 绘制支持向量
if self.support_vectors is not None:
plt.scatter(self.support_vectors[:, 0], self.support_vectors[:, 1],
s=150, facecolors='none', edgecolors='k',
linewidths=2, label='支持向量')
# 绘制测试数据(如果提供)
if X_test is not None and y_test is not None:
plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test,
cmap=plt.cm.coolwarm, marker='s', s=30,
alpha=0.5, label='测试数据')
plt.xlabel('特征1', fontsize=12)
plt.ylabel('特征2', fontsize=12)
plt.title(f'SVM决策边界 (C={self.C})', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 创建并训练SVM
def svm_example():
"""SVM对偶问题求解示例"""
# 生成合成数据
X, y = make_classification(n_samples=100, n_features=2, n_informative=2,
n_redundant=0, n_clusters_per_class=1,
flip_y=0.1, random_state=42)
y = np.where(y == 0, -1, 1) # 转换为±1标签
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
print("训练数据形状:", X_train.shape)
print("测试数据形状:", X_test.shape)
# 训练SVM
svm = DualSVM(C=1.0, kernel='linear')
svm.fit(X_train, y_train)
# 评估
train_pred = svm.predict(X_train)
test_pred = svm.predict(X_test)
train_acc = np.mean(train_pred == y_train)
test_acc = np.mean(test_pred == y_test)
print(f"\n训练准确率: {train_acc:.4f}")
print(f"测试准确率: {test_acc:.4f}")
print(f"支持向量数量: {len(svm.support_vectors)}")
print(f"偏置项 b: {svm.b:.4f}")
if svm.w is not None:
print(f"权重向量 w: {svm.w}")
# 可视化
svm.visualize(X_train, y_train, X_test, y_test)
return svm
if __name__ == "__main__":
svm = svm_example()
```
这个实现展示了如何将对偶理论应用于实际的机器学习算法。通过求解对偶问题,SVM能够高效处理高维特征空间中的分类问题,并且通过核技巧扩展到非线性情况。
### 6.3 对偶视角下的SVM解释
从对偶视角看SVM,我们可以获得许多重要洞察:
1. **支持向量的识别**:对偶变量αᵢ > 0对应的样本就是支持向量,它们决定了决策边界
2. **核技巧的自然表达**:对偶形式只涉及样本间的内积,这为核方法提供了天然框架
3. **模型复杂度的控制**:参数C在对偶问题中表现为αᵢ的上界,直接控制了模型的复杂度
对偶形式还揭示了SVM与逻辑回归等方法的深层联系。事实上,许多机器学习算法都可以从对偶角度重新理解,这常常能带来新的算法改进思路。
## 7. 对偶理论在鲁棒优化中的应用
### 7.1 鲁棒优化的对偶转换
鲁棒优化处理的是参数不确定的优化问题。考虑一个简单的线性规划问题,但约束矩阵A的元素有不确定性:
```
最小化 cᵀx
约束条件: Ax ≤ b,其中A在某个不确定集U内
```
通过引入对偶变量,我们可以将鲁棒约束转换为确定性的约束。具体来说,对于每个不确定约束,我们考虑最坏情况:
```
Aᵢx ≤ bᵢ 对所有Aᵢ ∈ Uᵢ
```
通过对每个不确定集取对偶,我们可以将无穷多约束转换为有限个约束,从而得到可求解的鲁棒对等问题。
### 7.2 Python实现:鲁棒线性规划
```python
import numpy as np
from scipy.optimize import linprog
class RobustLinearProgram:
"""鲁棒线性规划求解器"""
def __init__(self, c, A_nominal, b, uncertainty_sets):
"""
初始化鲁棒线性规划问题
参数:
c: 目标函数系数
A_nominal: 标称约束矩阵
b: 约束右侧
uncertainty_sets: 不确定集列表,每个元素为(半径, 范数类型)
"""
self.c = c
self.A_nominal = A_nominal
self.b = b
self.uncertainty_sets = uncertainty_sets
self.m, self.n = A_nominal.shape
def robustify_constraint(self, i):
"""将第i个约束鲁棒化"""
radius, norm_type = self.uncertainty_sets[i]
if norm_type == 'L1':
# 对于L1不确定集,对偶后得到线性约束
# 原始: aᵢᵀx + radius * ||x||_∞ ≤ bᵢ
# 对偶后需要引入辅助变量
pass
elif norm_type == 'Linf':
# 对于L∞不确定集,对偶后得到线性约束
# 原始: aᵢᵀx + radius * ||x||_1 ≤ bᵢ
# 对偶后需要引入辅助变量
pass
else:
raise ValueError(f"不支持的范数类型: {norm_type}")
def solve_robust(self):
"""求解鲁棒优化问题"""
# 这里展示L∞不确定集的情况
# 对于约束 aᵢᵀx + ρ||x||_1 ≤ bᵢ
# 通过对偶转换为: 存在u,v ≥ 0使得
# aᵢᵀx + ρ·1ᵀ(u+v) ≤ bᵢ
# x = u - v
# u, v ≥ 0
n_original = self.n
n_augmented = 2 * self.n # 为每个原始变量添加u和v
# 扩展的目标函数系数
c_ext = np.zeros(n_original + n_augmented)
c_ext[:n_original] = self.c
# 构建约束矩阵
constraints = []
# 原始鲁棒约束
for i in range(self.m):
radius, norm_type = self.uncertainty_sets[i]
if norm_type == 'Linf':
# 对于L∞不确定集
row = np.zeros(n_original + n_augmented)
# aᵢᵀx部分
row[:n_original] = self.A_nominal[i, :]
# ρ·1ᵀ(u+v)部分
row[n_original:n_original + n_augmented] = radius
constraints.append((row, self.b[i]))
# 添加 x = u - v 约束
for j in range(n_original):
row_eq = np.zeros(n_original + n_augmented)
row_eq[j] = 1 # xⱼ
row_eq[n_original + j] = -1 # -uⱼ
row_eq[n_original + n_original + j] = 1 # vⱼ
constraints.append((row_eq, 0))
# 非负约束
bounds = [(None, None)] * n_original + [(0, None)] * n_augmented
# 转换为线性规划标准形式
A_ub = []
b_ub = []
for coeff, rhs in constraints:
A_ub.append(coeff)
b_ub.append(rhs)
A_ub = np.array(A_ub)
b_ub = np.array(b_ub)
# 求解
res = linprog(c_ext, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
if res.success:
x_opt = res.x[:n_original]
return x_opt, res.fun
else:
raise ValueError("鲁棒优化问题求解失败")
# 示例:鲁棒投资组合优化
def robust_portfolio_example():
"""鲁棒投资组合优化示例"""
# 假设有3种资产
n_assets = 3
# 预期收益率(有不确定性)
mu_nominal = np.array([0.08, 0.12, 0.15])
# 不确定集:每个收益率的L∞球,半径=0.02
uncertainty_sets = [('Linf', 0.02)] * n_assets
# 协方差矩阵(假设已知且确定)
Sigma = np.array([[0.04, 0.01, 0.02],
[0.01, 0.09, 0.03],
[0.02, 0.03, 0.16]])
# 目标:最大化鲁棒收益,同时控制风险
# 转换为:最小化 -鲁棒收益 + λ·风险
lambda_risk = 0.5 # 风险厌恶系数
# 构建鲁棒优化问题
# 约束:∑xᵢ = 1, xᵢ ≥ 0
# 目标:最大化 min_{μ∈U} μᵀx - λ·xᵀΣx
# 由于鲁棒收益涉及min,通过对偶转换为max问题
# min_{μ∈U} μᵀx = μ₀ᵀx - ρ||x||_1 (对于L∞不确定集)
# 因此原问题等价于:
# 最大化 μ₀ᵀx - ρ||x||_1 - λ·xᵀΣx
# 这是一个凸优化问题,但带有L1范数项
# 可以通过变量拆分转换为线性规划
print("鲁棒投资组合优化示例")
print("=" * 50)
# 这里简化处理,只展示思想
# 实际实现需要考虑二次项xᵀΣx,这需要更复杂的处理
print("鲁棒优化框架已构建")
print("实际应用中需要根据具体问题设计对偶转换策略")
return None
if __name__ == "__main__":
robust_portfolio_example()
```
这个示例展示了如何通过对偶理论处理鲁棒优化问题。在实际应用中,鲁棒优化通常涉及更复杂的不确定集和对偶转换,但基本思想是一致的:通过对偶将半无限规划(无穷多约束)转换为有限维优化问题。
## 8. 对偶理论的高级话题与前沿应用
### 8.1 内点法与对偶理论
内点法是求解线性规划的主流算法之一,它天然地同时处理原问题和对偶问题。原始-对偶内点法在每次迭代中同时更新原变量和对偶变量,并保持它们满足互补松弛条件。
算法的核心思想是求解以下扰动KKT系统:
```
Ax = b
Aᵀy + s = c
xᵢsᵢ = μ, i = 1,...,n
x, s ≥ 0
```
其中μ > 0是中心参数,随着迭代逐渐减小到0。当μ → 0时,我们得到最优解。
### 8.2 半定规划的对偶
半定规划(SDP)是线性规划的推广,其变量是对称半正定矩阵。SDP的对偶理论更加丰富,也更具挑战性。考虑原始SDP问题:
```
最小化 C·X
约束条件: Aᵢ·X = bᵢ, i = 1,...,m
X ≽ 0
```
其对偶问题为:
```
最大化 bᵀy
约束条件: ∑yᵢAᵢ ≼ C
```
其中"·"表示矩阵内积,"≽"和"≼"表示半正定和半负定。
SDP对偶在组合优化、控制系统、量子信息等领域有广泛应用。例如,著名的Max-Cut问题的Goemans-Williamson近似算法就是基于SDP对偶。
### 8.3 分布式优化中的对偶分解
对偶理论在分布式优化中扮演着关键角色。考虑可分离问题:
```
最小化 ∑fᵢ(xᵢ)
约束条件: ∑Aᵢxᵢ = b
xᵢ ∈ Xᵢ
```
通过对偶分解,我们可以将原问题分解为多个子问题:
```
L(x, y) = ∑[fᵢ(xᵢ) + yᵀAᵢxᵢ] - yᵀb
```
对偶问题为:
```
最大化 g(y) = ∑gᵢ(y) - yᵀb
```
其中gᵢ(y) = min_{xᵢ∈Xᵢ} [fᵢ(xᵢ) + yᵀAᵢxᵢ]。
这种分解允许并行求解子问题,非常适合大规模分布式计算。
### 8.4 Python实现:对偶分解算法
```python
import numpy as np
from typing import List, Callable
class DualDecomposition:
"""对偶分解算法"""
def __init__(self, n_subsystems: int,
subproblem_solvers: List[Callable],
coupling_matrix: np.ndarray,
coupling_rhs: np.ndarray):
"""
初始化对偶分解
参数:
n_subsystems: 子系统数量
subproblem_solvers: 子问题求解器列表
coupling_matrix: 耦合矩阵A,形状(m, n_total)
coupling_rhs: 耦合约束右侧b,形状(m,)
"""
self.n_subsystems = n_subsystems
self.subproblem_solvers = subproblem_solvers
self.A = coupling_matrix
self.b = coupling_rhs
self.m = coupling_rhs.shape[0]
# 检查维度
total_vars = sum(solver(0).shape[0] for solver in subproblem_solvers)
assert self.A.shape[1] == total_vars, "维度不匹配"
def solve_subproblem(self, y: np.ndarray, subproblem_idx: int):
"""求解第i个子问题"""
# 这里假设子问题求解器接受对偶变量y作为输入
# 返回最优解和最优值
return self.subproblem_solvers[subproblem_idx](y)
def compute_dual_value(self, y: np.ndarray):
"""计算对偶函数值"""
total_obj = 0
x_parts = []
start_idx = 0
for i in range(self.n_subsystems):
x_i, obj_i = self.solve_subproblem(y, i)
total_obj += obj_i
x_parts.append(x_i)
# 更新索引
start_idx += x_i.shape[0]
# 减去 yᵀb
total_obj -= y @ self.b
# 拼接所有子问题的解
x_full = np.concatenate(x_parts)
return total_obj, x_full
def compute_subgradient(self, x: np.ndarray):
"""计算次梯度:Ax - b"""
return self.A @ x - self.b
def solve(self, max_iter: int = 1000, step_size: float = 0.1,
tol: float = 1e-6, verbose: bool = True):
"""求解对偶问题(使用次梯度法)"""
# 初始化对偶变量
y = np.zeros(self.m)
best_dual = -np.inf
best_x = None
best_y = None
for k in range(max_iter):
# 求解子问题,得到对偶函数值和原变量
dual_val, x = self.compute_dual_value(y)
# 更新最佳解
if dual_val > best_dual:
best_dual = dual_val
best_x = x.copy()
best_y = y.copy()
# 计算次梯度
subgrad = self.compute_subgradient(x)
# 更新对偶变量(次梯度法)
step = step_size / (k + 1) # 递减步长
y = y + step * subgrad
# 投影到非负象限(如果需要)
# y = np.maximum(y, 0)
# 打印进度
if verbose and k % 100 == 0:
primal_feasibility = np.linalg.norm(subgrad)
print(f"迭代 {k}: 对偶值 = {dual_val:.6f}, "
f"原始可行性 = {primal_feasibility:.6f}")
# 检查收敛
if np.linalg.norm(subgrad) < tol:
if verbose:
print(f"在迭代 {k} 收敛")
break
if verbose:
print(f"\n最终结果:")
print(f"最佳对偶值: {best_dual:.6f}")
print(f"原始可行性: {np.linalg.norm(self.A @ best_x - self.b):.6f}")
return best_x, best_y, best_dual
# 示例:资源分配问题
def resource_allocation_example():
"""资源分配问题的对偶分解"""
print("资源分配问题 - 对偶分解示例")
print("=" * 50)
# 问题描述:两个子系统共享资源
# 子系统1: 最小化 f1(x1) = x1², 约束: x1 ≤ 10
# 子系统2: 最小化 f2(x2) = 2x2², 约束: x2 ≤ 5
# 耦合约束: x1 + x2 = 8
# 定义子问题求解器
def subproblem1(y):
"""子系统1: min x1² - y*x1, s.t. 0 ≤ x1 ≤ 10"""
# 无约束最优解: x1 = y/2
# 考虑边界约束
x1_unc = y[0] / 2
x1 = np.clip(x1_unc, 0, 10)
obj = x1**2 - y[0] * x1
return np.array([x1]), obj
def subproblem2(y):
"""子系统2: min 2x2² - y*x2, s.t. 0 ≤ x2 ≤ 5"""
# 无约束最优解: x2 = y[0]/4
# 考虑边界约束
x2_unc = y[0] / 4
x2 = np.clip(x2_unc, 0, 5)
obj = 2 * x2**2 - y[0] * x2
return np.array([x2]), obj
# 耦合约束: x1 + x2 = 8
A = np.array([[1, 1]]) # 耦合矩阵
b = np.array([8]) # 右侧
# 创建对偶分解求解器
solver = DualDecomposition(
n_subsystems=2,
subproblem_solvers=[subproblem1, subproblem2],
coupling_matrix=A,
coupling_rhs=b
)
# 求解
x_opt, y_opt, dual_val = solver.solve(
max_iter=500,
step_size=0.5,
tol=1e-6,
verbose=True
)
print(f"\n最优解: x1 = {x_opt[0]:.4f}, x2 = {x_opt[1]:.4f}")
print(f"对偶变量: y = {y_opt[0]:.4f}")
print(f"耦合约束: x1 + x2 = {x_opt[0] + x_opt[1]:.4f} (目标: 8)")
print(f"子目标值: f1 = {x_opt[0]**2:.4f}, f2 = {2*x_opt[1]**2:.4f}")
print(f"总目标值: {x_opt[0]**2 + 2*x_opt[1]**2:.4f}")
return x_opt, y_opt
if __name__ == "__main__":
x_opt, y_opt = resource_allocation_example()
```
这个实现展示了对偶分解算法的基本框架。在实际的大规模分布式优化问题中,这种分解方法可以显著降低计算复杂度,使问题可以在多个处理器上并行求解。
对偶理论从最初的线性规划对偶,发展到今天的凸优化对偶、拉格朗日对偶、共轭对偶等多个分支,已经成为优化理论的核心支柱之一。无论是理论研究还是工程应用,对偶思想都提供了强大的工具和深刻的洞察。通过本文的讨论和代码实现,希望你能不仅理解对偶理论的基本概念,更能掌握如何在实际问题中应用这些理论。真正的掌握来自于实践——尝试用这些方法解决你遇到的实际优化问题,你会发现对偶理论远不止是数学上的优美,更是解决实际问题的利器。