# 用Python复现经典算法:从零构建K-means聚类与线性回归梯度下降
很多朋友在初学机器学习时,常常陷入一个尴尬的境地:理论公式背得滚瓜烂熟,各种算法的优缺点也能说得头头是道,但一旦打开Jupyter Notebook,面对空白的代码单元格,却不知从何下手。这种“理论懂但不会写代码”的痛点,我当年也深有体会。记得第一次尝试实现K-means时,光是理解如何用numpy数组高效计算距离矩阵,就花了整整一个下午。
今天,我们就来彻底解决这个问题。我不会给你一堆抽象的理论,而是带你亲手用Python从零开始,一步步实现两个最经典的机器学习算法:K-means聚类和线性回归的梯度下降。更重要的是,我会分享在Jupyter Notebook中调试这些算法的实用技巧,以及如何对照scikit-learn的源码来验证自己的实现是否正确。无论你是刚接触编程的入门者,还是想夯实基础的进阶学习者,这篇文章都能让你获得实实在在的代码能力提升。
## 1. 环境准备与数据生成:构建可复现的实验基础
在开始编写算法之前,我们需要搭建一个稳定、可复现的Python环境。我强烈推荐使用Anaconda来管理你的Python环境,它能避免各种依赖冲突问题。如果你已经安装了Anaconda,可以创建一个专门用于机器学习实验的环境:
```bash
conda create -n ml_practice python=3.9
conda activate ml_practice
```
接下来安装必要的库。除了numpy、matplotlib这些基础库外,我们还会安装scikit-learn,但**不是为了直接调用它的算法**,而是为了两个目的:一是生成模拟数据来测试我们的实现,二是对照它的源码来学习工业级代码的写法。
```python
# 安装核心依赖
pip install numpy matplotlib scikit-learn pandas jupyter
```
现在打开Jupyter Notebook,让我们先创建一些用于测试的模拟数据。好的测试数据应该具有清晰的模式,这样我们才能直观地判断算法是否工作正常。
```python
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs, make_regression
import warnings
warnings.filterwarnings('ignore')
# 设置随机种子确保结果可复现
np.random.seed(42)
# 生成K-means测试数据:3个明显分离的簇
X_cluster, y_true = make_blobs(n_samples=300, centers=3,
cluster_std=0.8, random_state=42)
# 生成线性回归测试数据
X_reg, y_reg = make_regression(n_samples=100, n_features=1,
noise=10, random_state=42)
# 可视化生成的数据
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# 聚类数据可视化
axes[0].scatter(X_cluster[:, 0], X_cluster[:, 1], c=y_true,
cmap='viridis', alpha=0.7, edgecolors='k')
axes[0].set_title('K-means测试数据(3个簇)')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')
# 回归数据可视化
axes[1].scatter(X_reg, y_reg, alpha=0.7, edgecolors='k')
axes[1].set_title('线性回归测试数据')
axes[1].set_xlabel('特征')
axes[1].set_ylabel('目标值')
plt.tight_layout()
plt.show()
```
> 提示:在Jupyter Notebook中,我习惯在每个重要的代码单元格后都添加一个Markdown单元格,记录当时的思考过程、遇到的问题以及解决方案。这种"代码+思考笔记"的方式,对于后续回顾和调试非常有帮助。
生成数据后,我们还需要对数据进行标准化处理。虽然对于我们的模拟数据这不是必须的,但这是一个好习惯,因为在实际应用中,不同特征往往具有不同的量纲。
```python
def standardize_data(X):
"""标准化数据:零均值,单位方差"""
mean = np.mean(X, axis=0)
std = np.std(X, axis=0)
# 避免除零错误
std[std == 0] = 1
return (X - mean) / std
# 标准化聚类数据
X_cluster_std = standardize_data(X_cluster)
# 标准化回归数据(只标准化特征,不标准化目标值)
X_reg_std = standardize_data(X_reg)
# 对于回归问题,我们通常也会标准化目标值,但这里为了直观展示,我们保持原样
```
现在我们已经准备好了实验环境。你可能注意到,我特意把数据生成和预处理分开写成独立的函数,而不是把所有代码堆在一个单元格里。这种模块化的写法有几个好处:一是代码更清晰易读,二是方便后续复用,三是便于单元测试。
## 2. K-means聚类算法:从原理到完整实现
K-means可能是最直观的聚类算法,但它的实现细节中藏着不少学问。我们先来回顾一下算法的基本步骤:
1. 随机初始化K个质心
2. 将每个数据点分配到最近的质心
3. 重新计算每个簇的质心(取均值)
4. 重复步骤2-3直到质心不再变化或达到最大迭代次数
听起来很简单,对吧?但在实现时,我们需要考虑效率问题。最耗时的部分是计算每个点到所有质心的距离,如果实现不当,计算复杂度会很高。
### 2.1 核心距离计算与分配逻辑
让我们先实现最核心的部分:距离计算和点分配。这里有一个效率技巧:使用向量化操作而不是循环。
```python
def compute_distances(X, centroids):
"""
计算每个数据点到所有质心的距离
参数:
X: (n_samples, n_features) 数据矩阵
centroids: (n_clusters, n_features) 质心矩阵
返回:
distances: (n_samples, n_clusters) 距离矩阵
"""
# 使用广播机制高效计算欧氏距离的平方
# (a-b)^2 = a^2 - 2ab + b^2
X_squared = np.sum(X**2, axis=1, keepdims=True)
centroids_squared = np.sum(centroids**2, axis=1)
dot_product = X @ centroids.T
distances = X_squared - 2 * dot_product + centroids_squared
return distances
def assign_clusters(X, centroids):
"""
将每个数据点分配到最近的质心
返回:
labels: (n_samples,) 每个点所属的簇标签
"""
distances = compute_distances(X, centroids)
# 找到每个点的最小距离对应的质心索引
labels = np.argmin(distances, axis=1)
return labels
```
这里我使用了向量化的距离计算方式。你可能在学校学过,两点间的欧氏距离公式是√(Σ(x_i - c_i)²),但实际计算时,我们通常计算距离的平方,因为平方根是单调函数,不影响比较结果,还能节省计算量。
### 2.2 质心更新与收敛判断
分配完点之后,我们需要更新质心。这里要注意处理空簇的情况——如果某个质心没有分配到任何点,我们需要重新初始化它。
```python
def update_centroids(X, labels, n_clusters, old_centroids):
"""
更新质心位置
参数:
X: 数据矩阵
labels: 每个点的簇标签
n_clusters: 簇的数量
old_centroids: 旧的质心位置,用于处理空簇
返回:
new_centroids: 更新后的质心
changed: 质心是否发生变化
"""
new_centroids = np.zeros_like(old_centroids)
for k in range(n_clusters):
# 获取属于第k簇的所有点
cluster_points = X[labels == k]
if len(cluster_points) > 0:
# 计算新的质心(均值)
new_centroids[k] = np.mean(cluster_points, axis=0)
else:
# 空簇处理:重新随机初始化
print(f"警告:簇{k}为空,重新初始化质心")
new_centroids[k] = X[np.random.randint(0, len(X))]
# 检查质心是否发生变化
changed = not np.allclose(new_centroids, old_centroids)
return new_centroids, changed
```
### 2.3 完整的K-means实现
现在我们把所有部分组合起来,实现完整的K-means算法。我还会添加一些实用功能,比如记录每次迭代的质心位置用于可视化,以及计算轮廓系数来评估聚类质量。
```python
class KMeansCustom:
"""自定义K-means实现"""
def __init__(self, n_clusters=3, max_iter=100, tol=1e-4, random_state=None):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
self.centroids = None
self.labels = None
self.centroid_history = [] # 记录质心变化历史
def _initialize_centroids(self, X):
"""初始化质心:随机选择K个数据点"""
if self.random_state is not None:
np.random.seed(self.random_state)
n_samples = X.shape[0]
indices = np.random.choice(n_samples, self.n_clusters, replace=False)
return X[indices]
def fit(self, X):
"""训练K-means模型"""
# 初始化质心
self.centroids = self._initialize_centroids(X)
self.centroid_history.append(self.centroids.copy())
for iteration in range(self.max_iter):
# 分配点到最近的质心
labels = assign_clusters(X, self.centroids)
# 更新质心
new_centroids, changed = update_centroids(
X, labels, self.n_clusters, self.centroids
)
# 记录质心变化
self.centroid_history.append(new_centroids.copy())
# 检查收敛
if not changed:
print(f"在第{iteration+1}次迭代后收敛")
break
self.centroids = new_centroids
self.labels = assign_clusters(X, self.centroids)
return self
def predict(self, X):
"""预测新数据的簇标签"""
if self.centroids is None:
raise ValueError("请先调用fit方法训练模型")
return assign_clusters(X, self.centroids)
def compute_inertia(self, X):
"""计算簇内平方和(inertia)"""
if self.labels is None:
self.labels = self.predict(X)
inertia = 0
for k in range(self.n_clusters):
cluster_points = X[self.labels == k]
if len(cluster_points) > 0:
# 计算簇内所有点到质心的距离平方和
distances = np.sum((cluster_points - self.centroids[k])**2)
inertia += distances
return inertia
```
### 2.4 可视化K-means的迭代过程
理解算法如何工作的最好方式就是可视化它的执行过程。让我们创建一个动画来展示K-means的迭代过程。
```python
def visualize_kmeans_iterations(X, kmeans_model, max_iter_to_show=5):
"""可视化K-means的迭代过程"""
fig, axes = plt.subplots(1, min(len(kmeans_model.centroid_history), max_iter_to_show),
figsize=(15, 4))
if len(kmeans_model.centroid_history) == 1:
axes = [axes]
for i, (ax, centroids) in enumerate(zip(axes, kmeans_model.centroid_history[:max_iter_to_show])):
# 如果是第一次迭代,只有初始质心,没有标签
if i == 0:
ax.scatter(X[:, 0], X[:, 1], alpha=0.5, c='gray')
ax.scatter(centroids[:, 0], centroids[:, 1],
c=range(len(centroids)), marker='X', s=200,
edgecolors='red', linewidths=2)
ax.set_title(f'迭代 {i}: 初始化质心')
else:
# 使用上一次迭代的质心分配标签
labels = assign_clusters(X, kmeans_model.centroid_history[i-1])
ax.scatter(X[:, 0], X[:, 1], c=labels, alpha=0.5, cmap='viridis')
ax.scatter(centroids[:, 0], centroids[:, 1],
c=range(len(centroids)), marker='X', s=200,
edgecolors='red', linewidths=2)
ax.set_title(f'迭代 {i}: 更新质心')
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
plt.tight_layout()
plt.show()
# 测试我们的K-means实现
kmeans = KMeansCustom(n_clusters=3, max_iter=10, random_state=42)
kmeans.fit(X_cluster_std)
# 可视化迭代过程
visualize_kmeans_iterations(X_cluster_std, kmeans)
```
运行这段代码,你会看到质心如何一步步移动到簇的中心位置。这种可视化不仅有助于理解算法,也是调试的好工具——如果质心移动异常,你就能立即发现问题。
### 2.5 与scikit-learn实现对比
现在让我们验证一下自己的实现是否正确。最好的方法就是与scikit-learn的实现进行对比。
```python
from sklearn.cluster import KMeans as SKLearnKMeans
# 使用scikit-learn的K-means
sklearn_kmeans = SKLearnKMeans(n_clusters=3, random_state=42, max_iter=100)
sklearn_labels = sklearn_kmeans.fit_predict(X_cluster_std)
sklearn_inertia = sklearn_kmeans.inertia_
# 使用我们的实现
our_kmeans = KMeansCustom(n_clusters=3, max_iter=100, random_state=42)
our_kmeans.fit(X_cluster_std)
our_labels = our_kmeans.labels
our_inertia = our_kmeans.compute_inertia(X_cluster_std)
# 比较结果
print("结果对比:")
print(f"scikit-learn inertia: {sklearn_inertia:.4f}")
print(f"我们的实现 inertia: {our_inertia:.4f}")
print(f"差异: {abs(sklearn_inertia - our_inertia):.6f}")
# 检查标签一致性(注意:K-means的标签顺序可能不同)
# 我们可以通过调整标签顺序来比较
from sklearn.metrics import adjusted_rand_score
# 调整标签顺序后比较
score = adjusted_rand_score(sklearn_labels, our_labels)
print(f"调整兰德指数: {score:.4f} (1.0表示完全一致)")
```
如果一切正常,你应该看到两个实现的inertia值非常接近,调整兰德指数接近1.0。如果有差异,可能是初始化不同导致的——K-means对初始质心敏感,可能收敛到不同的局部最优解。
## 3. 线性回归与梯度下降:理解优化过程
线性回归是机器学习中最基础的算法,而梯度下降是理解现代机器学习优化的关键。很多人知道梯度下降的公式,但真正实现时才会遇到各种实际问题。
### 3.1 线性回归的数学基础
简单线性回归的模型是:y = w·x + b,其中w是权重(斜率),b是偏置(截距)。我们的目标是找到使预测误差最小的w和b。
损失函数通常使用均方误差(MSE):
L(w, b) = (1/2m) Σ(y_i - (w·x_i + b))²
这里除以2是为了求导时消去系数,m是样本数量。
### 3.2 批量梯度下降实现
我们先实现最简单的批量梯度下降(Batch Gradient Descent),即每次迭代使用所有样本计算梯度。
```python
class LinearRegressionGD:
"""使用梯度下降的线性回归"""
def __init__(self, learning_rate=0.01, n_iter=1000, random_state=None):
self.learning_rate = learning_rate
self.n_iter = n_iter
self.random_state = random_state
self.w = None # 权重
self.b = None # 偏置
self.loss_history = [] # 记录损失变化
def _initialize_parameters(self, n_features):
"""初始化模型参数"""
if self.random_state is not None:
np.random.seed(self.random_state)
# 小随机数初始化
self.w = np.random.randn(n_features) * 0.01
self.b = 0.0
def _compute_gradients(self, X, y, y_pred):
"""计算梯度"""
m = X.shape[0]
# 误差
error = y_pred - y
# 梯度
dw = (1/m) * X.T @ error
db = (1/m) * np.sum(error)
return dw, db
def _compute_loss(self, y, y_pred):
"""计算均方误差损失"""
m = y.shape[0]
loss = (1/(2*m)) * np.sum((y_pred - y)**2)
return loss
def fit(self, X, y):
"""训练线性回归模型"""
m, n_features = X.shape
# 初始化参数
self._initialize_parameters(n_features)
# 确保y是1D数组
y = y.reshape(-1)
for i in range(self.n_iter):
# 前向传播:计算预测值
y_pred = X @ self.w + self.b
# 计算损失
loss = self._compute_loss(y, y_pred)
self.loss_history.append(loss)
# 计算梯度
dw, db = self._compute_gradients(X, y, y_pred)
# 更新参数
self.w -= self.learning_rate * dw
self.b -= self.learning_rate * db
# 每100次迭代打印进度
if i % 100 == 0:
print(f"迭代 {i}: 损失 = {loss:.4f}")
return self
def predict(self, X):
"""预测"""
if self.w is None:
raise ValueError("请先调用fit方法训练模型")
return X @ self.w + self.b
```
### 3.3 学习率实验:理解这个关键超参数
学习率可能是梯度下降中最重要的超参数。太小会导致收敛太慢,太大会导致震荡甚至发散。让我们通过实验来直观理解这一点。
```python
def experiment_learning_rates(X, y, learning_rates=[0.001, 0.01, 0.1, 0.5]):
"""实验不同学习率对梯度下降的影响"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()
for idx, lr in enumerate(learning_rates):
ax = axes[idx]
# 训练模型
model = LinearRegressionGD(learning_rate=lr, n_iter=500, random_state=42)
model.fit(X, y)
# 绘制损失曲线
ax.plot(model.loss_history)
ax.set_xlabel('迭代次数')
ax.set_ylabel('损失')
ax.set_title(f'学习率 = {lr}')
ax.grid(True, alpha=0.3)
# 添加最终损失值标注
final_loss = model.loss_history[-1]
ax.text(0.7, 0.9, f'最终损失: {final_loss:.4f}',
transform=ax.transAxes, fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
# 如果是过大的学习率,损失可能会爆炸
if any(np.isnan(loss) or loss > 1e6 for loss in model.loss_history[:10]):
ax.text(0.5, 0.5, '学习率太大!\n梯度下降发散',
transform=ax.transAxes, fontsize=12,
ha='center', va='center',
bbox=dict(boxstyle='round', facecolor='red', alpha=0.3))
plt.tight_layout()
plt.show()
# 运行学习率实验
print("不同学习率对梯度下降的影响:")
experiment_learning_rates(X_reg_std, y_reg)
```
通过这个实验,你会清楚地看到:太小的学习率(如0.001)导致收敛极慢;适中的学习率(如0.01)平稳收敛;太大的学习率(如0.5)可能导致损失震荡甚至发散。
### 3.4 特征缩放的重要性
梯度下降对特征的尺度非常敏感。如果不同特征的尺度差异很大,梯度下降可能会在某个方向上进展太慢,在另一个方向上震荡。让我们通过一个实验来验证这一点。
```python
# 创建具有不同尺度的特征
np.random.seed(42)
X_unscaled = np.random.randn(100, 2)
# 故意让第二个特征的尺度比第一个大100倍
X_unscaled[:, 1] = X_unscaled[:, 1] * 100
# 生成目标值
true_w = np.array([3.0, 2.0])
true_b = 5.0
y_unscaled = X_unscaled @ true_w + true_b + np.random.randn(100) * 10
# 对比标准化前后的梯度下降效果
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 未标准化的数据
model_unscaled = LinearRegressionGD(learning_rate=0.0001, n_iter=1000)
model_unscaled.fit(X_unscaled, y_unscaled)
axes[0].plot(model_unscaled.loss_history)
axes[0].set_title('未标准化数据 (学习率=0.0001)')
axes[0].set_xlabel('迭代次数')
axes[0].set_ylabel('损失')
axes[0].grid(True, alpha=0.3)
# 标准化后的数据
X_scaled = standardize_data(X_unscaled)
model_scaled = LinearRegressionGD(learning_rate=0.1, n_iter=1000)
model_scaled.fit(X_scaled, y_unscaled)
axes[1].plot(model_scaled.loss_history)
axes[1].set_title('标准化数据 (学习率=0.1)')
axes[1].set_xlabel('迭代次数')
axes[1].set_ylabel('损失')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("对比结果:")
print(f"未标准化 - 最终损失: {model_unscaled.loss_history[-1]:.4f}")
print(f"标准化 - 最终损失: {model_scaled.loss_history[-1]:.4f}")
print(f"未标准化需要更小的学习率({0.0001}),收敛更慢")
print(f"标准化后可以使用更大的学习率({0.1}),收敛更快")
```
这个实验清楚地展示了特征标准化的重要性。在实际项目中,我几乎总是会对连续特征进行标准化处理,这不仅有助于梯度下降,也使模型更容易解释。
### 3.5 多种梯度下降变体比较
除了批量梯度下降,还有随机梯度下降(SGD)和小批量梯度下降(Mini-batch GD)。让我们实现并比较这三种变体。
```python
class LinearRegressionSGD(LinearRegressionGD):
"""随机梯度下降"""
def fit(self, X, y):
m, n_features = X.shape
self._initialize_parameters(n_features)
y = y.reshape(-1)
for i in range(self.n_iter):
# 随机选择一个样本
idx = np.random.randint(0, m)
X_i = X[idx:idx+1] # 保持2D形状
y_i = y[idx:idx+1]
# 前向传播
y_pred = X_i @ self.w + self.b
# 计算梯度(只基于一个样本)
dw, db = self._compute_gradients(X_i, y_i, y_pred)
# 更新参数
self.w -= self.learning_rate * dw
self.b -= self.learning_rate * db
# 每100次迭代计算一次完整损失
if i % 100 == 0:
y_pred_full = X @ self.w + self.b
loss = self._compute_loss(y, y_pred_full)
self.loss_history.append(loss)
return self
class LinearRegressionMiniBatch(LinearRegressionGD):
"""小批量梯度下降"""
def __init__(self, learning_rate=0.01, n_iter=1000, batch_size=32, random_state=None):
super().__init__(learning_rate, n_iter, random_state)
self.batch_size = batch_size
def fit(self, X, y):
m, n_features = X.shape
self._initialize_parameters(n_features)
y = y.reshape(-1)
for i in range(self.n_iter):
# 随机选择一个小批量
indices = np.random.choice(m, self.batch_size, replace=False)
X_batch = X[indices]
y_batch = y[indices]
# 前向传播
y_pred = X_batch @ self.w + self.b
# 计算梯度
dw, db = self._compute_gradients(X_batch, y_batch, y_pred)
# 更新参数
self.w -= self.learning_rate * dw
self.b -= self.learning_rate * db
# 每100次迭代计算一次完整损失
if i % 100 == 0:
y_pred_full = X @ self.w + self.b
loss = self._compute_loss(y, y_pred_full)
self.loss_history.append(loss)
return self
# 比较三种梯度下降变体
X_test = X_reg_std
y_test = y_reg
# 批量梯度下降
bgd_model = LinearRegressionGD(learning_rate=0.1, n_iter=1000)
bgd_model.fit(X_test, y_test)
# 随机梯度下降
sgd_model = LinearRegressionSGD(learning_rate=0.01, n_iter=10000) # 需要更多迭代
sgd_model.fit(X_test, y_test)
# 小批量梯度下降
mbgd_model = LinearRegressionMiniBatch(learning_rate=0.05, n_iter=2000, batch_size=16)
mbgd_model.fit(X_test, y_test)
# 可视化比较
plt.figure(figsize=(10, 6))
plt.plot(bgd_model.loss_history, label='批量梯度下降 (BGD)', alpha=0.8)
plt.plot(sgd_model.loss_history, label='随机梯度下降 (SGD)', alpha=0.8)
plt.plot(mbgd_model.loss_history, label='小批量梯度下降 (Mini-batch)', alpha=0.8)
plt.xlabel('迭代次数(每100次记录一次)')
plt.ylabel('损失')
plt.title('三种梯度下降变体比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 打印最终结果对比
print("最终损失对比:")
print(f"批量梯度下降: {bgd_model.loss_history[-1]:.6f}")
print(f"随机梯度下降: {sgd_model.loss_history[-1]:.6f}")
print(f"小批量梯度下降: {mbgd_model.loss_history[-1]:.6f}")
```
从结果中你可以看到:
- **批量梯度下降**:每次迭代稳定,但计算量大
- **随机梯度下降**:震荡明显,但可能跳出局部最优
- **小批量梯度下降**:折中方案,兼具两者的优点
在实际应用中,小批量梯度下降通常是最佳选择,这也是为什么深度学习框架如PyTorch和TensorFlow默认使用这种优化方式。
## 4. Jupyter Notebook调试技巧与scikit-learn源码学习
掌握了算法实现后,让我们来聊聊如何高效调试和学习。Jupyter Notebook有一些独特的调试技巧,而阅读scikit-learn源码则是提升代码质量的最佳途径。
### 4.1 Jupyter Notebook调试技巧
**技巧1:使用`%debug`魔法命令**
当代码抛出异常时,在下一个单元格中输入`%debug`,可以进入交互式调试器。
```python
# 示例:故意制造一个错误来演示调试
def buggy_function(x):
result = x / 0 # 除零错误
return result
try:
buggy_function(10)
except Exception as e:
print(f"捕获到异常: {e}")
# 在这里,你可以打开新的单元格输入 %debug 进行调试
```
**技巧2:使用`%time`和`%timeit`测试代码性能**
```python
# 测试不同实现的性能
import time
# 方法1:使用循环计算距离
def compute_distances_loop(X, centroids):
n_samples = X.shape[0]
n_clusters = centroids.shape[0]
distances = np.zeros((n_samples, n_clusters))
for i in range(n_samples):
for k in range(n_clusters):
distances[i, k] = np.sum((X[i] - centroids[k])**2)
return distances
# 方法2:使用向量化计算(我们的实现)
def compute_distances_vectorized(X, centroids):
X_squared = np.sum(X**2, axis=1, keepdims=True)
centroids_squared = np.sum(centroids**2, axis=1)
dot_product = X @ centroids.T
return X_squared - 2 * dot_product + centroids_squared
# 性能测试
X_test = np.random.randn(1000, 10)
centroids_test = np.random.randn(5, 10)
print("循环实现:")
%timeit compute_distances_loop(X_test, centroids_test)
print("\n向量化实现:")
%timeit compute_distances_vectorized(X_test, centroids_test)
```
你会看到向量化实现比循环实现快几十甚至上百倍。这就是为什么在科学计算中要尽量避免显式循环。
**技巧3:使用`%%writefile`保存代码到文件**
当你的Notebook中的代码变得复杂时,可以将其保存为Python文件,方便导入和测试。
```python
%%writefile my_kmeans.py
"""
自定义K-means实现
"""
import numpy as np
class KMeansCustom:
def __init__(self, n_clusters=3, max_iter=100, random_state=None):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.random_state = random_state
self.centroids = None
self.labels = None
def fit(self, X):
# 简化的实现
n_samples, n_features = X.shape
# 初始化质心
np.random.seed(self.random_state)
indices = np.random.choice(n_samples, self.n_clusters, replace=False)
self.centroids = X[indices]
for _ in range(self.max_iter):
# 分配标签
distances = self._compute_distances(X)
self.labels = np.argmin(distances, axis=1)
# 更新质心
new_centroids = np.array([
X[self.labels == k].mean(axis=0)
for k in range(self.n_clusters)
])
# 检查收敛
if np.allclose(new_centroids, self.centroids):
break
self.centroids = new_centroids
return self
def _compute_distances(self, X):
X_squared = np.sum(X**2, axis=1, keepdims=True)
centroids_squared = np.sum(self.centroids**2, axis=1)
dot_product = X @ self.centroids.T
return X_squared - 2 * dot_product + centroids_squared
def predict(self, X):
distances = self._compute_distances(X)
return np.argmin(distances, axis=1)
print("KMeansCustom类已保存到my_kmeans.py")
```
### 4.2 学习scikit-learn源码的最佳实践
阅读scikit-learn源码是提升编程能力的绝佳方式。以下是我总结的一些学习步骤:
**步骤1:找到目标算法的源码位置**
```python
# 查找KMeans的源码位置
import sklearn.cluster
print(sklearn.cluster.KMeans.__module__)
# 输出:'sklearn.cluster._kmeans'
# 在Jupyter中查看源码
import inspect
source = inspect.getsource(sklearn.cluster.KMeans)
print(source[:500]) # 打印前500个字符
```
**步骤2:对比自己的实现与官方实现**
创建一个对比表格,分析差异:
| 特性 | 我们的实现 | scikit-learn实现 | 学习点 |
|------|-----------|-----------------|--------|
| 初始化方法 | 随机选择点 | k-means++算法 | k-means++能获得更好的初始质心 |
| 空簇处理 | 随机重新初始化 | 选择最远点作为新质心 | 更稳定的空簇处理策略 |
| 距离计算 | 欧氏距离平方 | 可配置的距离度量 | 支持多种距离度量 |
| 收敛判断 | 质心变化小于阈值 | 结合inertia变化 | 更严格的收敛条件 |
| 并行计算 | 不支持 | 支持多线程并行 | 大规模数据下的性能优化 |
**步骤3:提取可复用的代码模式**
从scikit-learn源码中,我们可以学到很多优秀的编程模式:
```python
# 模式1:参数验证装饰器
from sklearn.utils.validation import check_X_y, check_array
def validated_method(X, y=None):
"""使用scikit-learn风格的参数验证"""
# 检查输入数据
X = check_array(X)
if y is not None:
X, y = check_X_y(X, y)
return X, y
# 模式2:使用joblib进行并行计算
from joblib import Parallel, delayed
def parallel_example(data_chunks):
"""并行处理数据块的模式"""
results = Parallel(n_jobs=-1)(
delayed(process_chunk)(chunk)
for chunk in data_chunks
)
return np.concatenate(results)
# 模式3:丰富的类属性和方法
class SklearnStyleModel:
"""模仿scikit-learn的API设计"""
def __init__(self, param1=1, param2=2):
self.param1 = param1
self.param2 = param2
self.is_fitted_ = False
def fit(self, X, y):
# 训练逻辑
self.is_fitted_ = True
return self
def predict(self, X):
if not self.is_fitted_:
raise ValueError("请先调用fit方法")
# 预测逻辑
pass
def score(self, X, y):
# 评分逻辑
pass
def get_params(self, deep=True):
# 获取参数
return {"param1": self.param1, "param2": self.param2}
def set_params(self, **params):
# 设置参数
for key, value in params.items():
setattr(self, key, value)
return self
```
**步骤4:实现一个scikit-learn兼容的版本**
最后,让我们尝试实现一个与scikit-learn API兼容的K-means版本:
```python
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.metrics.pairwise import euclidean_distances
import warnings
class MyKMeans(BaseEstimator, ClusterMixin):
"""与scikit-learn API兼容的K-means实现"""
def __init__(self, n_clusters=3, max_iter=100, tol=1e-4,
random_state=None, verbose=0):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
self.verbose = verbose
def _initialize_centroids(self, X):
"""简单的k-means++初始化"""
n_samples, n_features = X.shape
if self.random_state is not None:
np.random.seed(self.random_state)
# 第一个质心随机选择
centroids = np.zeros((self.n_clusters, n_features))
centroids[0] = X[np.random.randint(n_samples)]
# 后续质心选择距离现有质心最远的点
for i in range(1, self.n_clusters):
# 计算每个点到最近质心的距离
distances = np.min(euclidean_distances(X, centroids[:i]), axis=1)
# 按距离平方的概率选择下一个质心
probabilities = distances ** 2
probabilities /= probabilities.sum()
next_idx = np.random.choice(n_samples, p=probabilities)
centroids[i] = X[next_idx]
return centroids
def fit(self, X, y=None):
"""训练模型"""
X = check_array(X)
n_samples, n_features = X.shape
# 初始化
self.cluster_centers_ = self._initialize_centroids(X)
self.labels_ = np.zeros(n_samples, dtype=int)
self.inertia_ = np.inf
for iteration in range(self.max_iter):
# 分配标签
distances = euclidean_distances(X, self.cluster_centers_)
new_labels = np.argmin(distances, axis=1)
# 更新质心
new_centers = np.zeros_like(self.cluster_centers_)
for k in range(self.n_clusters):
cluster_points = X[new_labels == k]
if len(cluster_points) > 0:
new_centers[k] = cluster_points.mean(axis=0)
else:
# 空簇处理:选择离当前质心最远的点
farthest_idx = np.argmax(
euclidean_distances(X, [self.cluster_centers_[k]])
)
new_centers[k] = X[farthest_idx]
# 计算inertia
new_inertia = np.sum(np.min(distances, axis=1) ** 2)
# 检查收敛
center_shift = np.linalg.norm(
new_centers - self.cluster_centers_, ord=2
)
if self.verbose > 0:
print(f"Iteration {iteration}, inertia {new_inertia:.4f}, "
f"shift {center_shift:.6f}")
# 更新状态
self.cluster_centers_ = new_centers
self.labels_ = new_labels
# 检查收敛条件
if center_shift < self.tol:
if self.verbose > 0:
print(f"Converged at iteration {iteration}")
break
# 检查inertia是否增加(不应该发生)
if new_inertia > self.inertia_ + self.tol:
warnings.warn(f"Inertia increased from {self.inertia_} to {new_inertia}")
self.inertia_ = new_inertia
self.n_iter_ = iteration + 1
return self
def predict(self, X):
"""预测新数据的簇标签"""
check_is_fitted(self, 'cluster_centers_')
X = check_array(X)
distances = euclidean_distances(X, self.cluster_centers_)
return np.argmin(distances, axis=1)
def fit_predict(self, X, y=None):
"""训练并预测"""
return self.fit(X, y).labels_
def transform(self, X):
"""转换数据到簇距离空间"""
check_is_fitted(self, 'cluster_centers_')
X = check_array(X)
return euclidean_distances(X, self.cluster_centers_)
def score(self, X, y=None):
"""负的inertia作为评分(越大越好)"""
check_is_fitted(self, 'cluster_centers_')
X = check_array(X)
distances = euclidean_distances(X, self.cluster_centers_)
inertia = np.sum(np.min(distances, axis=1) ** 2)
return -inertia
# 测试我们的scikit-learn兼容版本
print("测试MyKMeans类:")
my_kmeans = MyKMeans(n_clusters=3, random_state=42, verbose=1)
my_kmeans.fit(X_cluster_std)
print(f"\n训练完成:")
print(f"迭代次数: {my_kmeans.n_iter_}")
print(f"最终inertia: {my_kmeans.inertia_:.4f}")
print(f"质心形状: {my_kmeans.cluster_centers_.shape}")
# 测试predict方法
test_points = np.random.randn(5, 2)
predictions = my_kmeans.predict(test_points)
print(f"\n测试点预测: {predictions}")
```
通过实现这个scikit-learn兼容的版本,你不仅加深了对K-means算法的理解,还学习了如何构建符合工业标准的机器学习类。这种经验在你未来实现自定义算法时非常有用。
在实现过程中,我特别注意了以下几点,这些都是从scikit-learn源码中学到的:
1. **一致的API设计**:使用`fit`、`predict`、`transform`等标准方法名
2. **参数验证**:使用`check_array`等工具验证输入数据
3. **状态管理**:使用`is_fitted_`属性跟踪模型状态
4. **错误处理**:提供清晰的错误信息
5. **性能考虑**:使用向量化操作,避免不必要的计算
这些实践让我在实际项目中少走了很多弯路。记得有一次,我因为没有正确验证输入数据,导致模型在处理特定格式的数据时崩溃,花了半天时间才找到问题所在。从那以后,我养成了在函数开头验证参数的习惯。
通过今天的学习,你不仅掌握了K-means和线性回归的实现,更重要的是学会了如何从零开始构建机器学习算法,如何调试和优化代码,以及如何从优秀的开源项目中学习编程最佳实践。这些技能比单纯调用现成的库更有价值,它们让你真正理解机器学习的工作原理,并能根据具体问题调整和优化算法。
在实际项目中,我经常需要根据业务需求对标准算法进行修改。有了这些底层实现经验,你就能自信地调整算法,而不是被限制在现有库的功能范围内。比如,你可能需要为K-means添加自定义的距离度量,或者为线性回归添加特殊的正则化项,这些都需要对算法有深入的理解。
最后分享一个小技巧:在Jupyter Notebook中完成算法原型后,我通常会将其重构为标准的Python模块,添加完整的文档字符串和单元测试。这样不仅代码更规范,也方便团队协作和项目维护。机器学习不仅仅是调参和训练模型,写出可维护、可测试的代码同样重要。