# 生态连通性廊道分析完整Python解决方案
基于电路理论的生态连通性分析是一种先进的空间分析方法,能够识别景观中重要的生态廊道[ref_1]。下面我将提供一个完整的Python程序,从阻力面和源地栅格数据出发,最终生成重要生态连通性廊道并进行可视化。
## 1. 环境配置与依赖库安装
首先需要安装必要的Python库:
```python
# 安装必要的库
# pip install gdal rasterio numpy scipy matplotlib seaborn geopandas pandas networkx scikit-learn
```
## 2. 完整程序代码
```python
import numpy as np
import rasterio
from scipy import sparse
from scipy.sparse.linalg import spsolve
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.patches import Patch
import seaborn as sns
import geopandas as gpd
from sklearn.cluster import DBSCAN
import networkx as nx
import os
from typing import Tuple, Dict, List
class EcologicalConnectivityAnalyzer:
"""
基于电路理论的生态连通性分析类
实现从阻力面和源地到生态廊道生成的完整流程
"""
def __init__(self):
self.resistance_data = None
self.source_data = None
self.current_map = None
self.voltage_map = None
self.connectivity_graph = None
def load_rasters(self, resistance_path: str, source_path: str) -> None:
"""
加载阻力面和源地栅格数据
参数:
resistance_path: 阻力面栅格文件路径
source_path: 源地栅格文件路径
"""
print("正在加载栅格数据...")
# 加载阻力面数据
with rasterio.open(resistance_path) as src:
self.resistance_data = src.read(1)
self.resistance_profile = src.profile
self.resistance_transform = src.transform
self.resistance_bounds = src.bounds
# 加载源地数据
with rasterio.open(source_path) as src:
self.source_data = src.read(1)
self.source_profile = src.profile
# 验证数据一致性
assert self.resistance_data.shape == self.source_data.shape, "阻力面和源地栅格尺寸不一致"
print(f"数据加载完成,栅格尺寸: {self.resistance_data.shape}")
def preprocess_data(self) -> None:
"""
数据预处理
"""
print("正在进行数据预处理...")
# 处理NoData值
self.resistance_data = np.where(self.resistance_data == self.resistance_profile.get('nodata', -9999),
np.nan, self.resistance_data)
self.source_data = np.where(self.source_data == self.source_profile.get('nodata', -9999),
np.nan, self.source_data)
# 确保阻力值为正数
self.resistance_data = np.where(self.resistance_data <= 0, 0.001, self.resistance_data)
# 识别源地(非零值区域)
self.source_locations = np.argwhere(~np.isnan(self.source_data) & (self.source_data > 0))
print(f"识别到 {len(self.source_locations)} 个源地像元")
def build_resistance_graph(self) -> sparse.csr_matrix:
"""
构建阻力图邻接矩阵(8邻域连接)
返回:
稀疏邻接矩阵
"""
print("正在构建阻力图...")
rows, cols = self.resistance_data.shape
n_pixels = rows * cols
# 8邻域方向
directions = [(-1, -1), (-1, 0), (-1, 1),
(0, -1), (0, 1),
(1, -1), (1, 0), (1, 1)]
# 对角线方向的长度因子
diag_factor = np.sqrt(2)
row_indices = []
col_indices = []
data_values = []
for i in range(rows):
for j in range(cols):
if np.isnan(self.resistance_data[i, j]):
continue
current_idx = i * cols + j
current_resistance = self.resistance_data[i, j]
for di, dj in directions:
ni, nj = i + di, j + dj
if 0 <= ni < rows and 0 <= nj < cols:
if np.isnan(self.resistance_data[ni, nj]):
continue
neighbor_idx = ni * cols + nj
neighbor_resistance = self.resistance_data[ni, nj]
# 计算连接电阻(平均电阻)
if di != 0 and dj != 0: # 对角线方向
connection_resistance = diag_factor * (current_resistance + neighbor_resistance) / 2
else: # 正交方向
connection_resistance = (current_resistance + neighbor_resistance) / 2
# 电导是电阻的倒数
conductance = 1.0 / connection_resistance if connection_resistance > 0 else 0
row_indices.append(current_idx)
col_indices.append(neighbor_idx)
data_values.append(conductance)
# 创建稀疏矩阵
graph_matrix = sparse.csr_matrix((data_values, (row_indices, col_indices)),
shape=(n_pixels, n_pixels))
print("阻力图构建完成")
return graph_matrix
def calculate_circuit_metrics(self, graph_matrix: sparse.csr_matrix) -> Tuple[np.ndarray, np.ndarray]:
"""
计算电路理论指标(电流和电压)
参数:
graph_matrix: 邻接矩阵
返回:
current_density: 电流密度图
voltage: 电压图
"""
print("正在计算电路理论指标...")
rows, cols = self.resistance_data.shape
n_pixels = rows * cols
# 构建拉普拉斯矩阵
degree_matrix = sparse.diags(graph_matrix.sum(axis=1).A1, format='csr')
laplacian = degree_matrix - graph_matrix
# 选择源地作为电流源
source_indices = [i * cols + j for i, j in self.source_locations]
# 构建电流源向量(单位电流)
current_source = np.zeros(n_pixels)
current_source[source_indices] = 1.0 / len(source_indices)
# 选择一个接地点(通常选择边界点)
ground_idx = 0 # 左上角作为接地点
current_source[ground_idx] -= 1.0 # 确保电流守恒
# 解决线性系统获得电压
# 固定接地点电压为0
mask = np.ones(n_pixels, dtype=bool)
mask[ground_idx] = False
laplacian_reduced = laplacian[mask, :][:, mask]
current_reduced = current_source[mask]
# 求解电压
voltage_reduced = spsolve(laplacian_reduced, -current_reduced)
voltage = np.zeros(n_pixels)
voltage[mask] = voltage_reduced
voltage[ground_idx] = 0 # 接地点电压为0
# 计算电流密度
current_density = np.zeros((rows, cols))
for i in range(rows):
for j in range(cols):
idx = i * cols + j
if np.isnan(self.resistance_data[i, j]):
current_density[i, j] = np.nan
continue
# 计算每个像元的净电流
neighbors_current = 0
for di, dj in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
ni, nj = i + di, j + dj
if 0 <= ni < rows and 0 <= nj < cols:
nidx = ni * cols + nj
if not np.isnan(self.resistance_data[ni, nj]):
conductance = graph_matrix[idx, nidx]
voltage_diff = voltage[idx] - voltage[nidx]
neighbors_current += conductance * voltage_diff
current_density[i, j] = abs(neighbors_current)
print("电路理论指标计算完成")
return current_density, voltage.reshape(rows, cols)
def identify_corridors(self, current_density: np.ndarray, threshold_percentile: float = 85) -> np.ndarray:
"""
识别重要生态廊道
参数:
current_density: 电流密度图
threshold_percentile: 阈值百分位
返回:
corridor_map: 廊道二值图
"""
print("正在识别生态廊道...")
# 计算阈值
valid_currents = current_density[~np.isnan(current_density)]
threshold = np.percentile(valid_currents, threshold_percentile)
# 创建廊道二值图
corridor_map = np.where(current_density >= threshold, 1, 0)
corridor_map = np.where(np.isnan(current_density), 0, corridor_map)
n_corridor_pixels = np.sum(corridor_map)
total_pixels = np.sum(~np.isnan(current_density))
print(f"识别到 {n_corridor_pixels} 个廊道像元,占总面积的 {n_corridor_pixels/total_pixels*100:.2f}%")
return corridor_map
def build_connectivity_network(self, corridor_map: np.ndarray) -> nx.Graph:
"""
构建连通性网络图
参数:
corridor_map: 廊道二值图
返回:
连通性网络图
"""
print("正在构建连通性网络...")
rows, cols = corridor_map.shape
G = nx.Graph()
# 添加廊道节点
for i in range(rows):
for j in range(cols):
if corridor_map[i, j] == 1:
node_id = f"{i}_{j}"
G.add_node(node_id, pos=(j, rows-i)) # 注意坐标转换
# 连接相邻的廊道像元
for di, dj in [(0, 1), (1, 0)]:
ni, nj = i + di, j + dj
if 0 <= ni < rows and 0 <= nj < cols and corridor_map[ni, nj] == 1:
neighbor_id = f"{ni}_{nj}"
# 计算边的权重(基于阻力值)
weight = (self.resistance_data[i, j] + self.resistance_data[ni, nj]) / 2
G.add_edge(node_id, neighbor_id, weight=weight)
# 添加源地节点并连接到最近的廊道
for idx, (i, j) in enumerate(self.source_locations):
source_id = f"source_{idx}"
G.add_node(source_id, pos=(j, rows-i), type='source')
# 找到最近的廊道节点
min_dist = float('inf')
nearest_corridor = None
for node in G.nodes():
if node.startswith('source'):
continue
node_i, node_j = map(int, node.split('_'))
dist = abs(node_i - i) + abs(node_j - j) # 曼哈顿距离
if dist < min_dist:
min_dist = dist
nearest_corridor = node
if nearest_corridor:
G.add_edge(source_id, nearest_corridor, weight=min_dist)
print(f"连通性网络构建完成,包含 {G.number_of_nodes()} 个节点和 {G.number_of_edges()} 条边")
return G
def analyze_network_centrality(self, G: nx.Graph) -> Dict:
"""
分析网络中心性指标
参数:
G: 连通性网络图
返回:
中心性指标字典
"""
print("正在计算网络中心性...")
centrality_measures = {}
# 度中心性
centrality_measures['degree'] = nx.degree_centrality(G)
# 介数中心性(识别关键廊道)
centrality_measures['betweenness'] = nx.betweenness_centrality(G, weight='weight')
# 接近中心性
centrality_measures['closeness'] = nx.closeness_centrality(G, distance='weight')
return centrality_measures
def run_analysis(self, resistance_path: str, source_path: str,
corridor_threshold: float = 85) -> None:
"""
运行完整分析流程
参数:
resistance_path: 阻力面文件路径
source_path: 源地文件路径
corridor_threshold: 廊道识别阈值百分位
"""
# 1. 数据加载与预处理
self.load_rasters(resistance_path, source_path)
self.preprocess_data()
# 2. 构建阻力图
graph_matrix = self.build_resistance_graph()
# 3. 计算电路理论指标
self.current_density, self.voltage_map = self.calculate_circuit_metrics(graph_matrix)
# 4. 识别生态廊道
self.corridor_map = self.identify_corridors(self.current_density, corridor_threshold)
# 5. 构建连通性网络
self.connectivity_graph = self.build_connectivity_network(self.corridor_map)
# 6. 网络分析
self.centrality_measures = self.analyze_network_centrality(self.connectivity_graph)
print("生态连通性分析完成!")
def visualize_results(self, output_dir: str = "results") -> None:
"""
可视化分析结果
参数:
output_dir: 输出目录
"""
print("正在生成可视化结果...")
os.makedirs(output_dir, exist_ok=True)
# 创建综合可视化图
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('生态连通性廊道分析结果', fontsize=16, fontweight='bold')
# 1. 阻力面可视化
ax1 = axes[0, 0]
resistance_masked = np.ma.array(self.resistance_data, mask=np.isnan(self.resistance_data))
im1 = ax1.imshow(resistance_masked, cmap='YlOrRd', aspect='equal')
ax1.set_title('阻力面分布')
ax1.set_xlabel('列')
ax1.set_ylabel('行')
plt.colorbar(im1, ax=ax1, label='阻力值')
# 2. 源地分布
ax2 = axes[0, 1]
source_masked = np.ma.array(self.source_data, mask=np.isnan(self.source_data))
im2 = ax2.imshow(source_masked, cmap='viridis', aspect='equal')
ax2.set_title('源地分布')
ax2.set_xlabel('列')
ax2.set_ylabel('行')
plt.colorbar(im2, ax=ax2, label='源地强度')
# 3. 电流密度图
ax3 = axes[0, 2]
current_masked = np.ma.array(self.current_density, mask=np.isnan(self.current_density))
im3 = ax3.imshow(current_masked, cmap='plasma', aspect='equal')
ax3.set_title('电流密度分布')
ax3.set_xlabel('列')
ax3.set_ylabel('行')
plt.colorbar(im3, ax=ax3, label='电流密度')
# 4. 电压分布
ax4 = axes[1, 0]
voltage_masked = np.ma.array(self.voltage_map, mask=np.isnan(self.voltage_map))
im4 = ax4.imshow(voltage_masked, cmap='coolwarm', aspect='equal')
ax4.set_title('电压分布')
ax4.set_xlabel('列')
ax4.set_ylabel('行')
plt.colorbar(im4, ax=ax4, label='电压值')
# 5. 生态廊道识别
ax5 = axes[1, 1]
corridor_display = np.where(self.corridor_map == 1, 1, np.nan)
background = np.ma.array(self.resistance_data, mask=np.isnan(self.resistance_data))
ax5.imshow(background, cmap='Greys', alpha=0.7, aspect='equal')
im5 = ax5.imshow(corridor_display, cmap='Greens', alpha=0.8, aspect='equal')
ax5.set_title('重要生态廊道')
ax5.set_xlabel('列')
ax5.set_ylabel('行')
# 创建图例
legend_elements = [Patch(facecolor='green', alpha=0.8, label='生态廊道')]
ax5.legend(handles=legend_elements, loc='upper right')
# 6. 网络中心性分析
ax6 = axes[1, 2]
# 提取节点位置和介数中心性
pos = nx.get_node_attributes(self.connectivity_graph, 'pos')
betweenness = self.centrality_measures['betweenness']
# 绘制网络
node_sizes = [5000 * betweenness[node] + 100 for node in self.connectivity_graph.nodes()]
node_colors = [betweenness[node] for node in self.connectivity_graph.nodes()]
# 区分源地节点和廊道节点
source_nodes = [node for node in self.connectivity_graph.nodes() if 'source' in node]
corridor_nodes = [node for node in self.connectivity_graph.nodes() if 'source' not in node]
# 绘制廊道节点
nx.draw_networkx_nodes(self.connectivity_graph, pos,
nodelist=corridor_nodes,
node_size=[node_sizes[self.connectivity_graph.nodes().index(node)]
for node in corridor_nodes],
node_color=[node_colors[self.connectivity_graph.nodes().index(node)]
for node in corridor_nodes],
cmap='YlGnBu', alpha=0.8, ax=ax6)
# 绘制源地节点
nx.draw_networkx_nodes(self.connectivity_graph, pos,
nodelist=source_nodes,
node_size=300, node_color='red',
alpha=0.8, ax=ax6)
# 绘制边
nx.draw_networkx_edges(self.connectivity_graph, pos, alpha=0.3,
edge_color='gray', ax=ax6)
ax6.set_title('连通性网络与中心性分析')
ax6.set_xlabel('X坐标')
ax6.set_ylabel('Y坐标')
ax6.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'{output_dir}/comprehensive_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
# 生成廊道重要性统计图
self._plot_corridor_statistics(output_dir)
print(f"可视化结果已保存至 {output_dir} 目录")
def _plot_corridor_statistics(self, output_dir: str) -> None:
"""
绘制廊道统计信息
参数:
output_dir: 输出目录
"""
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 电流密度分布直方图
ax1 = axes[0]
valid_currents = self.current_density[~np.isnan(self.current_density)]
ax1.hist(valid_currents, bins=50, alpha=0.7, color='skyblue', edgecolor='black')
ax1.axvline(np.percentile(valid_currents, 85), color='red', linestyle='--',
label=f'85%分位数: {np.percentile(valid_currents, 85):.4f}')
ax1.set_xlabel('电流密度')
ax1.set_ylabel('频数')
ax1.set_title('电流密度分布')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 介数中心性分布
ax2 = axes[1]
betweenness_values = list(self.centrality_measures['betweenness'].values())
ax2.hist(betweenness_values, bins=30, alpha=0.7, color='lightgreen', edgecolor='black')
ax2.set_xlabel('介数中心性')
ax2.set_ylabel('频数')
ax2.set_title('网络节点介数中心性分布')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'{output_dir}/statistical_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
def save_results(self, output_dir: str = "results") -> None:
"""
保存分析结果
参数:
output_dir: 输出目录
"""
print("正在保存分析结果...")
os.makedirs(output_dir, exist_ok=True)
# 更新元数据
output_profile = self.resistance_profile.copy()
output_profile.update({
'dtype': 'float32',
'nodata': -9999
})
# 保存电流密度图
with rasterio.open(f'{output_dir}/current_density.tif', 'w', **output_profile) as dst:
current_output = np.where(np.isnan(self.current_density), -9999, self.current_density)
dst.write(current_output.astype('float32'), 1)
# 保存电压图
with rasterio.open(f'{output_dir}/voltage_map.tif', 'w', **output_profile) as dst:
voltage_output = np.where(np.isnan(self.voltage_map), -9999, self.voltage_map)
dst.write(voltage_output.astype('float32'), 1)
# 保存廊道图
output_profile.update({'dtype': 'uint8'})
with rasterio.open(f'{output_dir}/ecological_corridors.tif', 'w', **output_profile) as dst:
corridor_output = np.where(np.isnan(self.resistance_data), 255, self.corridor_map)
dst.write(corridor_output.astype('uint8'), 1)
# 保存网络中心性结果
import json
centrality_data = {
'betweenness_centrality': self.centrality_measures['betweenness'],
'degree_centrality': self.centrality_measures['degree'],
'closeness_centrality': self.centrality_measures['closeness']
}
with open(f'{output_dir}/network_centrality.json', 'w') as f:
json.dump(centrality_data, f, indent=2)
# 保存分析摘要
summary = {
'source_count': len(self.source_locations),
'corridor_pixel_count': int(np.sum(self.corridor_map)),
'total_pixel_count': int(np.sum(~np.isnan(self.resistance_data))),
'mean_current_density': float(np.nanmean(self.current_density)),
'max_current_density': float(np.nanmax(self.current_density)),
'network_nodes': self.connectivity_graph.number_of_nodes(),
'network_edges': self.connectivity_graph.number_of_edges()
}
with open(f'{output_dir}/analysis_summary.json', 'w') as f:
json.dump(summary, f, indent=2)
print(f"分析结果已保存至 {output_dir} 目录")
# 使用示例
def main():
"""
主函数 - 演示如何使用生态连通性分析器
"""
# 初始化分析器
analyzer = EcologicalConnectivityAnalyzer()
# 运行分析(请替换为您的实际文件路径)
resistance_file = "path/to/your/resistance.tif"
source_file = "path/to/your/source.tif"
try:
# 执行完整分析流程
analyzer.run_analysis(resistance_file, source_file, corridor_threshold=85)
# 生成可视化结果
analyzer.visualize_results("ecological_connectivity_results")
# 保存分析结果
analyzer.save_results("ecological_connectivity_results")
print("生态连通性廊道分析圆满完成!")
except Exception as e:
print(f"分析过程中出现错误: {e}")
print("请检查输入文件路径和数据格式")
if __name__ == "__main__":
main()
```
## 3. 程序核心功能详解
### 3.1 电路理论实现原理
本程序基于电路理论实现生态连通性分析[ref_1],核心逻辑如下:
| 电路概念 | 生态学对应 | 实现方法 |
|---------|------------|----------|
| 电阻 | 景观阻力 | 阻力面栅格值 |
| 电流源 | 生态源地 | 源地栅格标识 |
| 电压 | 生态位势 | 线性系统求解 |
| 电流密度 | 连通性强度 | 廊道识别依据 |
### 3.2 关键技术步骤
```python
# 关键步骤1: 构建8邻域阻力图
def build_resistance_graph(self):
# 实现8方向邻域连接,考虑对角线距离因子√2
# 电导计算: conductance = 1/resistance
# 关键步骤2: 求解电路方程
def calculate_circuit_metrics(self, graph_matrix):
# 构建拉普拉斯矩阵: L = D - A
# 求解线性系统: L·v = -i
# 计算电流密度: I = G·Δv
```
### 3.3 廊道识别算法
程序采用百分位数阈值法识别重要廊道:
- 计算电流密度的85%分位数作为阈值
- 高于阈值的区域被识别为重要生态廊道
- 基于介数中心性识别关键廊道节点
## 4. 数据输入要求
### 4.1 输入数据格式
```python
# 阻力面数据要求
# - GeoTIFF格式
# - 像元值表示景观阻力(正值)
# - NoData值用于表示屏障
# 源地数据要求
# - 与阻力面相同的坐标系和范围
# - 源地区域用正值标识
# - 非源地区域用NoData或0值
```
### 4.2 数据预处理
程序自动处理以下问题:
- NoData值识别与处理
- 坐标系统一性验证
- 阻力值正数化处理
- 源地位置提取
## 5. 输出结果说明
### 5.1 主要输出文件
| 文件类型 | 内容说明 | 用途 |
|----------|----------|------|
| current_density.tif | 电流密度分布图 | 识别连通性热点 |
| voltage_map.tif | 电压分布图 | 分析生态位势梯度 |
| ecological_corridors.tif | 廊道二值图 | 廊道空间分布 |
| network_centrality.json | 网络中心性指标 | 识别关键节点 |
### 5.2 可视化输出
程序生成的综合可视化包括:
1. **阻力面分布** - 显示景观阻力空间格局
2. **源地分布** - 标识生态源地区域
3. **电流密度** - 连通性强度空间分布
4. **电压分布** - 生态位势格局
5. **生态廊道** - 识别的重要连通通道
6. **网络分析** - 廊道网络结构与关键节点
## 6. 应用建议与参数调整
### 6.1 参数调优建议
```python
# 廊道识别阈值调整
analyzer.run_analysis(resistance_file, source_file, corridor_threshold=90) # 更严格
analyzer.run_analysis(resistance_file, source_file, corridor_threshold=80) # 更宽松
# 邻域连接方式(可修改为4邻域)
# 在build_resistance_graph方法中修改directions列表
```
### 6.2 结果解读指南
- **高电流密度区域**: 生态流的主要通道,优先保护
- **高介数中心性节点**: 网络中的关键连接点
- **连续廊道**: 物种迁移的潜在路径
- **断裂区域**: 需要生态修复的障碍点
本程序提供了一套完整的从原始栅格数据到生态廊道识别与可视化的解决方案,基于成熟的电路理论方法[ref_1],能够有效支持生态保护规划和景观连通性评估工作。